TradingAgents/cli/utils.py

486 lines
18 KiB
Python

import subprocess
import questionary
from typing import List, Optional, Tuple, Dict
from rich.console import Console
from cli.models import AnalystType
from tradingagents.llm_clients.model_catalog import get_model_options
console = Console()
TICKER_INPUT_EXAMPLES = "Examples: SPY, CNC.TO, 7203.T, 0700.HK"
ANALYST_ORDER = [
("Market Analyst", AnalystType.MARKET),
("Social Media Analyst", AnalystType.SOCIAL),
("News Analyst", AnalystType.NEWS),
("Fundamentals Analyst", AnalystType.FUNDAMENTALS),
]
def get_ticker() -> str:
"""Prompt the user to enter a ticker symbol."""
ticker = questionary.text(
f"Enter the exact ticker symbol to analyze ({TICKER_INPUT_EXAMPLES}):",
validate=lambda x: len(x.strip()) > 0 or "Please enter a valid ticker symbol.",
style=questionary.Style(
[
("text", "fg:green"),
("highlighted", "noinherit"),
]
),
).ask()
if not ticker:
console.print("\n[red]No ticker symbol provided. Exiting...[/red]")
exit(1)
return normalize_ticker_symbol(ticker)
def normalize_ticker_symbol(ticker: str) -> str:
"""Normalize ticker input while preserving exchange suffixes."""
return ticker.strip().upper()
def get_analysis_date() -> str:
"""Prompt the user to enter a date in YYYY-MM-DD format."""
import re
from datetime import datetime
def validate_date(date_str: str) -> bool:
if not re.match(r"^\d{4}-\d{2}-\d{2}$", date_str):
return False
try:
datetime.strptime(date_str, "%Y-%m-%d")
return True
except ValueError:
return False
date = questionary.text(
"Enter the analysis date (YYYY-MM-DD):",
validate=lambda x: validate_date(x.strip())
or "Please enter a valid date in YYYY-MM-DD format.",
style=questionary.Style(
[
("text", "fg:green"),
("highlighted", "noinherit"),
]
),
).ask()
if not date:
console.print("\n[red]No date provided. Exiting...[/red]")
exit(1)
return date.strip()
def select_analysts() -> List[AnalystType]:
"""Select analysts using an interactive checkbox."""
choices = questionary.checkbox(
"Select Your [Analysts Team]:",
choices=[
questionary.Choice(display, value=value) for display, value in ANALYST_ORDER
],
instruction="\n- Press Space to select/unselect analysts\n- Press 'a' to select/unselect all\n- Press Enter when done",
validate=lambda x: len(x) > 0 or "You must select at least one analyst.",
style=questionary.Style(
[
("checkbox-selected", "fg:green"),
("selected", "fg:green noinherit"),
("highlighted", "noinherit"),
("pointer", "noinherit"),
]
),
).ask()
if not choices:
console.print("\n[red]No analysts selected. Exiting...[/red]")
exit(1)
return choices
def select_research_depth() -> int:
"""Select research depth using an interactive selection."""
# Define research depth options with their corresponding values
DEPTH_OPTIONS = [
("Shallow - Quick research, few debate and strategy discussion rounds", 1),
("Medium - Middle ground, moderate debate rounds and strategy discussion", 3),
("Deep - Comprehensive research, in depth debate and strategy discussion", 5),
]
choice = questionary.select(
"Select Your [Research Depth]:",
choices=[
questionary.Choice(display, value=value) for display, value in DEPTH_OPTIONS
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:yellow noinherit"),
("highlighted", "fg:yellow noinherit"),
("pointer", "fg:yellow noinherit"),
]
),
).ask()
if choice is None:
console.print("\n[red]No research depth selected. Exiting...[/red]")
exit(1)
return choice
def _resolve_model_options(
provider: str, static_options: dict
) -> list[tuple[str, str]]:
"""Return model options for the given provider.
For Copilot, fetches the live model list from the API.
For all other providers, looks up the static options dict.
Exits with an error message if no models are available.
"""
if provider.lower() == "copilot":
options = fetch_copilot_models()
if not options:
console.print("[red]No Copilot models available. Exiting...[/red]")
exit(1)
else:
options = static_options.get(provider.lower())
if not options:
console.print(f"[red]No models available for provider '{provider}'. Exiting...[/red]")
exit(1)
return options
def select_shallow_thinking_agent(provider) -> str:
"""Select shallow thinking llm engine using an interactive selection."""
# Define shallow thinking llm engine options with their corresponding model names
# Ordering: medium → light → heavy (balanced first for quick tasks)
# Within same tier, newer models first
SHALLOW_AGENT_OPTIONS = {
"openai": [
("GPT-5 Mini - Balanced speed, cost, and capability", "gpt-5-mini"),
("GPT-5 Nano - High-throughput, simple tasks", "gpt-5-nano"),
("GPT-5.4 - Latest frontier, 1M context", "gpt-5.4"),
("GPT-4.1 - Smartest non-reasoning model", "gpt-4.1"),
],
"anthropic": [
("Claude Sonnet 4.6 - Best speed and intelligence balance", "claude-sonnet-4-6"),
("Claude Haiku 4.5 - Fast, near-instant responses", "claude-haiku-4-5"),
("Claude Sonnet 4.5 - Agents and coding", "claude-sonnet-4-5"),
],
"google": [
("Gemini 3 Flash - Next-gen fast", "gemini-3-flash-preview"),
("Gemini 2.5 Flash - Balanced, stable", "gemini-2.5-flash"),
("Gemini 3.1 Flash Lite - Most cost-efficient", "gemini-3.1-flash-lite-preview"),
("Gemini 2.5 Flash Lite - Fast, low-cost", "gemini-2.5-flash-lite"),
],
"xai": [
("Grok 4.1 Fast (Non-Reasoning) - Speed optimized, 2M ctx", "grok-4-1-fast-non-reasoning"),
("Grok 4 Fast (Non-Reasoning) - Speed optimized", "grok-4-fast-non-reasoning"),
("Grok 4.1 Fast (Reasoning) - High-performance, 2M ctx", "grok-4-1-fast-reasoning"),
],
"openrouter": [
("NVIDIA Nemotron 3 Nano 30B (free)", "nvidia/nemotron-3-nano-30b-a3b:free"),
("Z.AI GLM 4.5 Air (free)", "z-ai/glm-4.5-air:free"),
],
"ollama": [
("Qwen3:latest (8B, local)", "qwen3:latest"),
("GPT-OSS:latest (20B, local)", "gpt-oss:latest"),
("GLM-4.7-Flash:latest (30B, local)", "glm-4.7-flash:latest"),
],
"copilot": [],
}
options = _resolve_model_options(provider, SHALLOW_AGENT_OPTIONS)
choice = questionary.select(
"Select Your [Quick-Thinking LLM Engine]:",
choices=[
questionary.Choice(display, value=value)
for display, value in options
for display, value in get_model_options(provider, "quick")
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:magenta noinherit"),
("highlighted", "fg:magenta noinherit"),
("pointer", "fg:magenta noinherit"),
]
),
).ask()
if choice is None:
console.print(
"\n[red]No shallow thinking llm engine selected. Exiting...[/red]"
)
exit(1)
return choice
def select_deep_thinking_agent(provider) -> str:
"""Select deep thinking llm engine using an interactive selection."""
# Define deep thinking llm engine options with their corresponding model names
# Ordering: heavy → medium → light (most capable first for deep tasks)
# Within same tier, newer models first
DEEP_AGENT_OPTIONS = {
"openai": [
("GPT-5.4 - Latest frontier, 1M context", "gpt-5.4"),
("GPT-5.2 - Strong reasoning, cost-effective", "gpt-5.2"),
("GPT-5 Mini - Balanced speed, cost, and capability", "gpt-5-mini"),
("GPT-5.4 Pro - Most capable, expensive ($30/$180 per 1M tokens)", "gpt-5.4-pro"),
],
"anthropic": [
("Claude Opus 4.6 - Most intelligent, agents and coding", "claude-opus-4-6"),
("Claude Opus 4.5 - Premium, max intelligence", "claude-opus-4-5"),
("Claude Sonnet 4.6 - Best speed and intelligence balance", "claude-sonnet-4-6"),
("Claude Sonnet 4.5 - Agents and coding", "claude-sonnet-4-5"),
],
"google": [
("Gemini 3.1 Pro - Reasoning-first, complex workflows", "gemini-3.1-pro-preview"),
("Gemini 3 Flash - Next-gen fast", "gemini-3-flash-preview"),
("Gemini 2.5 Pro - Stable pro model", "gemini-2.5-pro"),
("Gemini 2.5 Flash - Balanced, stable", "gemini-2.5-flash"),
],
"xai": [
("Grok 4 - Flagship model", "grok-4-0709"),
("Grok 4.1 Fast (Reasoning) - High-performance, 2M ctx", "grok-4-1-fast-reasoning"),
("Grok 4 Fast (Reasoning) - High-performance", "grok-4-fast-reasoning"),
("Grok 4.1 Fast (Non-Reasoning) - Speed optimized, 2M ctx", "grok-4-1-fast-non-reasoning"),
],
"openrouter": [
("Z.AI GLM 4.5 Air (free)", "z-ai/glm-4.5-air:free"),
("NVIDIA Nemotron 3 Nano 30B (free)", "nvidia/nemotron-3-nano-30b-a3b:free"),
],
"ollama": [
("GLM-4.7-Flash:latest (30B, local)", "glm-4.7-flash:latest"),
("GPT-OSS:latest (20B, local)", "gpt-oss:latest"),
("Qwen3:latest (8B, local)", "qwen3:latest"),
],
"copilot": [], # populated dynamically by fetch_copilot_models()
}
options = _resolve_model_options(provider, DEEP_AGENT_OPTIONS)
choice = questionary.select(
"Select Your [Deep-Thinking LLM Engine]:",
choices=[
questionary.Choice(display, value=value)
for display, value in options
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:magenta noinherit"),
("highlighted", "fg:magenta noinherit"),
("pointer", "fg:magenta noinherit"),
]
),
).ask()
if choice is None:
console.print("\n[red]No deep thinking llm engine selected. Exiting...[/red]")
exit(1)
return choice
def fetch_copilot_models() -> list[tuple[str, str]]:
"""Fetch models from the GitHub Copilot inference API.
Returns a list of (display_label, model_id) tuples sorted by model ID.
Requires authentication via ``gh auth login`` with a Copilot subscription.
"""
from tradingagents.llm_clients.copilot_client import list_copilot_models
console.print("[dim]Fetching available Copilot models...[/dim]")
models = list_copilot_models()
if not models:
console.print("[yellow]Warning: Could not fetch Copilot models.[/yellow]")
return models
def select_llm_provider() -> tuple[str, str]:
"""Select the LLM provider using interactive selection."""
BASE_URLS = [
("OpenAI", "https://api.openai.com/v1"),
("Google", "https://generativelanguage.googleapis.com/v1"),
("Anthropic", "https://api.anthropic.com/"),
("xAI", "https://api.x.ai/v1"),
("Openrouter", "https://openrouter.ai/api/v1"),
("Ollama", "http://localhost:11434/v1"),
("Copilot", ""), # resolved at runtime via GraphQL
]
choice = questionary.select(
"Select your LLM Provider:",
choices=[
questionary.Choice(display, value=(display, value))
for display, value in BASE_URLS
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:magenta noinherit"),
("highlighted", "fg:magenta noinherit"),
("pointer", "fg:magenta noinherit"),
]
),
).ask()
if choice is None:
console.print("\n[red]No LLM provider selected. Exiting...[/red]")
exit(1)
display_name, url = choice
print(f"You selected: {display_name}\tURL: {url}")
return display_name, url
def perform_copilot_oauth() -> bool:
"""Ensure the user is authenticated with the GitHub CLI for Copilot.
Checks for an existing token and verifies Copilot access. If the token
is missing, offers to run ``gh auth login`` interactively.
Returns True if a valid token with Copilot access is available, False otherwise.
"""
from tradingagents.llm_clients.copilot_client import check_copilot_auth, get_github_token
token = get_github_token()
if token:
if check_copilot_auth():
console.print("[green]✓ Authenticated with GitHub Copilot[/green]")
return True
console.print(
"[yellow]⚠ GitHub token found but Copilot access failed. "
"Check your Copilot subscription.[/yellow]"
)
return False
console.print(
"[yellow]⚠ No GitHub token found.[/yellow] "
"You need to authenticate to use GitHub Copilot."
)
should_login = questionary.confirm("Run `gh auth login` now?", default=True).ask()
if not should_login:
console.print("[red]GitHub authentication skipped. Exiting...[/red]")
return False
try:
result = subprocess.run(["gh", "auth", "login"])
if result.returncode != 0:
console.print("[red]`gh auth login` failed.[/red]")
return False
except FileNotFoundError:
console.print("[red]Error: `gh` command not found. Please install the GitHub CLI.[/red]")
return False
if check_copilot_auth():
console.print("[green]✓ Authenticated with GitHub Copilot[/green]")
return True
console.print("[red]Could not retrieve token or verify Copilot access after login.[/red]")
return False
def ask_openai_reasoning_effort() -> str:
"""Ask for OpenAI reasoning effort level."""
choices = [
questionary.Choice("Medium (Default)", "medium"),
questionary.Choice("High (More thorough)", "high"),
questionary.Choice("Low (Faster)", "low"),
]
return questionary.select(
"Select Reasoning Effort:",
choices=choices,
style=questionary.Style([
("selected", "fg:cyan noinherit"),
("highlighted", "fg:cyan noinherit"),
("pointer", "fg:cyan noinherit"),
]),
).ask()
def ask_anthropic_effort() -> str | None:
"""Ask for Anthropic effort level.
Controls token usage and response thoroughness on Claude 4.5+ and 4.6 models.
"""
return questionary.select(
"Select Effort Level:",
choices=[
questionary.Choice("High (recommended)", "high"),
questionary.Choice("Medium (balanced)", "medium"),
questionary.Choice("Low (faster, cheaper)", "low"),
],
style=questionary.Style([
("selected", "fg:cyan noinherit"),
("highlighted", "fg:cyan noinherit"),
("pointer", "fg:cyan noinherit"),
]),
).ask()
def ask_gemini_thinking_config() -> str | None:
"""Ask for Gemini thinking configuration.
Returns thinking_level: "high" or "minimal".
Client maps to appropriate API param based on model series.
"""
return questionary.select(
"Select Thinking Mode:",
choices=[
questionary.Choice("Enable Thinking (recommended)", "high"),
questionary.Choice("Minimal/Disable Thinking", "minimal"),
],
style=questionary.Style([
("selected", "fg:green noinherit"),
("highlighted", "fg:green noinherit"),
("pointer", "fg:green noinherit"),
]),
).ask()
def ask_output_language() -> str:
"""Ask for report output language."""
choice = questionary.select(
"Select Output Language:",
choices=[
questionary.Choice("English (default)", "English"),
questionary.Choice("Chinese (中文)", "Chinese"),
questionary.Choice("Japanese (日本語)", "Japanese"),
questionary.Choice("Korean (한국어)", "Korean"),
questionary.Choice("Hindi (हिन्दी)", "Hindi"),
questionary.Choice("Spanish (Español)", "Spanish"),
questionary.Choice("Portuguese (Português)", "Portuguese"),
questionary.Choice("French (Français)", "French"),
questionary.Choice("German (Deutsch)", "German"),
questionary.Choice("Arabic (العربية)", "Arabic"),
questionary.Choice("Russian (Русский)", "Russian"),
questionary.Choice("Custom language", "custom"),
],
style=questionary.Style([
("selected", "fg:yellow noinherit"),
("highlighted", "fg:yellow noinherit"),
("pointer", "fg:yellow noinherit"),
]),
).ask()
if choice == "custom":
return questionary.text(
"Enter language name (e.g. Turkish, Vietnamese, Thai, Indonesian):",
validate=lambda x: len(x.strip()) > 0 or "Please enter a language name.",
).ask().strip()
return choice