322 lines
9.9 KiB
Python
322 lines
9.9 KiB
Python
import json
|
|
from pathlib import Path
|
|
from typing import List, NoReturn
|
|
|
|
import questionary
|
|
from rich.console import Console
|
|
|
|
from cli.models import AnalystType
|
|
|
|
console = Console()
|
|
|
|
CONFIG_PATH = Path(__file__).resolve().parents[1] / "config.json"
|
|
|
|
def _exit_with_config_error(message: str) -> NoReturn:
|
|
"""Exit with error message. This function never returns."""
|
|
console.print(f"\n[red]{message}[/red]")
|
|
raise SystemExit(1)
|
|
|
|
|
|
def _load_config() -> dict:
|
|
try:
|
|
with CONFIG_PATH.open("r", encoding="utf-8") as config_file:
|
|
return json.load(config_file)
|
|
except FileNotFoundError:
|
|
_exit_with_config_error(f"Config file not found: {CONFIG_PATH}")
|
|
except json.JSONDecodeError as exc:
|
|
_exit_with_config_error(f"Invalid JSON in config file: {exc}")
|
|
except OSError as exc:
|
|
_exit_with_config_error(f"Unable to read config file: {exc}")
|
|
|
|
|
|
def _get_config_section(config: dict, key: str, expected_type: type):
|
|
value = config.get(key)
|
|
if not isinstance(value, expected_type):
|
|
_exit_with_config_error(
|
|
f"Invalid or missing '{key}' in config file: {CONFIG_PATH}"
|
|
)
|
|
return value
|
|
|
|
|
|
CONFIG = _load_config()
|
|
BASE_URLS = [
|
|
(display, url)
|
|
for display, url in _get_config_section(CONFIG, "BASE_URLS", list)
|
|
]
|
|
DEEP_AGENT_OPTIONS = {
|
|
provider: [(display, model) for display, model in options]
|
|
for provider, options in _get_config_section(
|
|
CONFIG, "DEEP_AGENT_OPTIONS", dict
|
|
).items()
|
|
}
|
|
SHALLOW_AGENT_OPTIONS = {
|
|
provider: [(display, model) for display, model in options]
|
|
for provider, options in _get_config_section(
|
|
CONFIG, "SHALLOW_AGENT_OPTIONS", dict
|
|
).items()
|
|
}
|
|
|
|
TICKER_INPUT_EXAMPLES = "Examples: SPY, CNC.TO, 7203.T, 0700.HK"
|
|
|
|
ANALYST_ORDER = [
|
|
("Market Analyst", AnalystType.MARKET),
|
|
("Social Media Analyst", AnalystType.SOCIAL),
|
|
("News Analyst", AnalystType.NEWS),
|
|
("Fundamentals Analyst", AnalystType.FUNDAMENTALS),
|
|
]
|
|
|
|
|
|
def get_ticker() -> str:
|
|
"""Prompt the user to enter a ticker symbol."""
|
|
ticker = questionary.text(
|
|
f"Enter the exact ticker symbol to analyze ({TICKER_INPUT_EXAMPLES}):",
|
|
validate=lambda x: len(x.strip()) > 0 or "Please enter a valid ticker symbol.",
|
|
style=questionary.Style(
|
|
[
|
|
("text", "fg:green"),
|
|
("highlighted", "noinherit"),
|
|
]
|
|
),
|
|
).ask()
|
|
|
|
if not ticker:
|
|
console.print("\n[red]No ticker symbol provided. Exiting...[/red]")
|
|
exit(1)
|
|
|
|
return normalize_ticker_symbol(ticker)
|
|
|
|
|
|
def normalize_ticker_symbol(ticker: str) -> str:
|
|
"""Normalize ticker input while preserving exchange suffixes."""
|
|
return ticker.strip().upper()
|
|
|
|
|
|
def get_analysis_date() -> str:
|
|
"""Prompt the user to enter a date in YYYY-MM-DD format."""
|
|
import re
|
|
from datetime import datetime
|
|
|
|
def validate_date(date_str: str) -> bool:
|
|
if not re.match(r"^\d{4}-\d{2}-\d{2}$", date_str):
|
|
return False
|
|
try:
|
|
datetime.strptime(date_str, "%Y-%m-%d")
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
date = questionary.text(
|
|
"Enter the analysis date (YYYY-MM-DD):",
|
|
validate=lambda x: validate_date(x.strip())
|
|
or "Please enter a valid date in YYYY-MM-DD format.",
|
|
style=questionary.Style(
|
|
[
|
|
("text", "fg:green"),
|
|
("highlighted", "noinherit"),
|
|
]
|
|
),
|
|
).ask()
|
|
|
|
if not date:
|
|
console.print("\n[red]No date provided. Exiting...[/red]")
|
|
exit(1)
|
|
|
|
return date.strip()
|
|
|
|
|
|
def select_analysts() -> List[AnalystType]:
|
|
"""Select analysts using an interactive checkbox."""
|
|
choices = questionary.checkbox(
|
|
"Select Your [Analysts Team]:",
|
|
choices=[
|
|
questionary.Choice(display, value=value) for display, value in ANALYST_ORDER
|
|
],
|
|
instruction="\n- Press Space to select/unselect analysts\n- Press 'a' to select/unselect all\n- Press Enter when done",
|
|
validate=lambda x: len(x) > 0 or "You must select at least one analyst.",
|
|
style=questionary.Style(
|
|
[
|
|
("checkbox-selected", "fg:green"),
|
|
("selected", "fg:green noinherit"),
|
|
("highlighted", "noinherit"),
|
|
("pointer", "noinherit"),
|
|
]
|
|
),
|
|
).ask()
|
|
|
|
if not choices:
|
|
console.print("\n[red]No analysts selected. Exiting...[/red]")
|
|
exit(1)
|
|
|
|
return choices
|
|
|
|
|
|
def select_research_depth() -> int:
|
|
"""Select research depth using an interactive selection."""
|
|
|
|
# Define research depth options with their corresponding values
|
|
DEPTH_OPTIONS = [
|
|
("Shallow - Quick research, few debate and strategy discussion rounds", 1),
|
|
("Medium - Middle ground, moderate debate rounds and strategy discussion", 3),
|
|
("Deep - Comprehensive research, in depth debate and strategy discussion", 5),
|
|
]
|
|
|
|
choice = questionary.select(
|
|
"Select Your [Research Depth]:",
|
|
choices=[
|
|
questionary.Choice(display, value=value) for display, value in DEPTH_OPTIONS
|
|
],
|
|
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
|
|
style=questionary.Style(
|
|
[
|
|
("selected", "fg:yellow noinherit"),
|
|
("highlighted", "fg:yellow noinherit"),
|
|
("pointer", "fg:yellow noinherit"),
|
|
]
|
|
),
|
|
).ask()
|
|
|
|
if choice is None:
|
|
console.print("\n[red]No research depth selected. Exiting...[/red]")
|
|
exit(1)
|
|
|
|
return choice
|
|
|
|
|
|
def select_shallow_thinking_agent(provider) -> str:
|
|
"""Select shallow thinking llm engine using an interactive selection."""
|
|
|
|
choice = questionary.select(
|
|
"Select Your [Quick-Thinking LLM Engine]:",
|
|
choices=[
|
|
questionary.Choice(display, value=value)
|
|
for display, value in SHALLOW_AGENT_OPTIONS[provider.lower()]
|
|
],
|
|
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
|
|
style=questionary.Style(
|
|
[
|
|
("selected", "fg:magenta noinherit"),
|
|
("highlighted", "fg:magenta noinherit"),
|
|
("pointer", "fg:magenta noinherit"),
|
|
]
|
|
),
|
|
).ask()
|
|
|
|
if choice is None:
|
|
console.print(
|
|
"\n[red]No shallow thinking llm engine selected. Exiting...[/red]"
|
|
)
|
|
exit(1)
|
|
|
|
return choice
|
|
|
|
|
|
def select_deep_thinking_agent(provider) -> str:
|
|
"""Select deep thinking llm engine using an interactive selection."""
|
|
|
|
choice = questionary.select(
|
|
"Select Your [Deep-Thinking LLM Engine]:",
|
|
choices=[
|
|
questionary.Choice(display, value=value)
|
|
for display, value in DEEP_AGENT_OPTIONS[provider.lower()]
|
|
],
|
|
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
|
|
style=questionary.Style(
|
|
[
|
|
("selected", "fg:magenta noinherit"),
|
|
("highlighted", "fg:magenta noinherit"),
|
|
("pointer", "fg:magenta noinherit"),
|
|
]
|
|
),
|
|
).ask()
|
|
|
|
if choice is None:
|
|
console.print("\n[red]No deep thinking llm engine selected. Exiting...[/red]")
|
|
exit(1)
|
|
|
|
return choice
|
|
|
|
def select_llm_provider() -> tuple[str, str]:
|
|
"""Select the OpenAI api url using interactive selection."""
|
|
choice = questionary.select(
|
|
"Select your LLM Provider:",
|
|
choices=[
|
|
questionary.Choice(display, value=(display, value))
|
|
for display, value in BASE_URLS
|
|
],
|
|
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
|
|
style=questionary.Style(
|
|
[
|
|
("selected", "fg:magenta noinherit"),
|
|
("highlighted", "fg:magenta noinherit"),
|
|
("pointer", "fg:magenta noinherit"),
|
|
]
|
|
),
|
|
).ask()
|
|
|
|
if choice is None:
|
|
console.print("\n[red]no OpenAI backend selected. Exiting...[/red]")
|
|
exit(1)
|
|
|
|
display_name, url = choice
|
|
print(f"You selected: {display_name}\tURL: {url}")
|
|
|
|
return display_name, url
|
|
|
|
|
|
def ask_openai_reasoning_effort() -> str:
|
|
"""Ask for OpenAI reasoning effort level."""
|
|
choices = [
|
|
questionary.Choice("Medium (Default)", "medium"),
|
|
questionary.Choice("High (More thorough)", "high"),
|
|
questionary.Choice("Low (Faster)", "low"),
|
|
]
|
|
return questionary.select(
|
|
"Select Reasoning Effort:",
|
|
choices=choices,
|
|
style=questionary.Style([
|
|
("selected", "fg:cyan noinherit"),
|
|
("highlighted", "fg:cyan noinherit"),
|
|
("pointer", "fg:cyan noinherit"),
|
|
]),
|
|
).ask()
|
|
|
|
|
|
def ask_anthropic_effort() -> str | None:
|
|
"""Ask for Anthropic effort level.
|
|
|
|
Controls token usage and response thoroughness on Claude 4.5+ and 4.6 models.
|
|
"""
|
|
return questionary.select(
|
|
"Select Effort Level:",
|
|
choices=[
|
|
questionary.Choice("High (recommended)", "high"),
|
|
questionary.Choice("Medium (balanced)", "medium"),
|
|
questionary.Choice("Low (faster, cheaper)", "low"),
|
|
],
|
|
style=questionary.Style([
|
|
("selected", "fg:cyan noinherit"),
|
|
("highlighted", "fg:cyan noinherit"),
|
|
("pointer", "fg:cyan noinherit"),
|
|
]),
|
|
).ask()
|
|
|
|
|
|
def ask_gemini_thinking_config() -> str | None:
|
|
"""Ask for Gemini thinking configuration.
|
|
|
|
Returns thinking_level: "high" or "minimal".
|
|
Client maps to appropriate API param based on model series.
|
|
"""
|
|
return questionary.select(
|
|
"Select Thinking Mode:",
|
|
choices=[
|
|
questionary.Choice("Enable Thinking (recommended)", "high"),
|
|
questionary.Choice("Minimal/Disable Thinking", "minimal"),
|
|
],
|
|
style=questionary.Style([
|
|
("selected", "fg:green noinherit"),
|
|
("highlighted", "fg:green noinherit"),
|
|
("pointer", "fg:green noinherit"),
|
|
]),
|
|
).ask()
|