from typing import Optional import datetime import typer from pathlib import Path from functools import wraps from rich.console import Console from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() from rich.panel import Panel from rich.spinner import Spinner from rich.live import Live from rich.columns import Columns from rich.markdown import Markdown from rich.layout import Layout from rich.text import Text from rich.live import Live from rich.table import Table from collections import deque import time from rich.tree import Tree from rich import box from rich.align import Align from rich.rule import Rule from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.default_config import DEFAULT_CONFIG from cli.models import AnalystType from cli.utils import * console = Console() app = typer.Typer( name="TradingAgents", help="TradingAgents CLI: Multi-Agents LLM Financial Trading Framework", add_completion=True, # Enable shell completion ) # Create a deque to store recent messages with a maximum length class MessageBuffer: def __init__(self, max_length=100): self.messages = deque(maxlen=max_length) self.tool_calls = deque(maxlen=max_length) self.current_report = None self.final_report = None # Store the complete final report self.agent_status = { # Analyst Team "Market Analyst": "pending", "Social Analyst": "pending", "News Analyst": "pending", "Fundamentals Analyst": "pending", # Research Team "Bull Researcher": "pending", "Bear Researcher": "pending", "Research Manager": "pending", # Trading Team "Trader": "pending", # Risk Management Team "Risky Analyst": "pending", "Neutral Analyst": "pending", "Safe Analyst": "pending", # Portfolio Management Team "Portfolio Manager": "pending", } self.current_agent = None self.report_sections = { "market_report": None, "sentiment_report": None, "news_report": None, "fundamentals_report": None, "investment_plan": None, "trader_investment_plan": None, "final_trade_decision": None, } def add_message(self, message_type, content): timestamp = datetime.datetime.now().strftime("%H:%M:%S") self.messages.append((timestamp, message_type, content)) def add_tool_call(self, tool_name, args): timestamp = datetime.datetime.now().strftime("%H:%M:%S") self.tool_calls.append((timestamp, tool_name, args)) def update_agent_status(self, agent, status): if agent in self.agent_status: self.agent_status[agent] = status self.current_agent = agent def update_report_section(self, section_name, content): if section_name in self.report_sections: self.report_sections[section_name] = content self._update_current_report() def _update_current_report(self): # For the panel display, only show the most recently updated section latest_section = None latest_content = None # Find the most recently updated section for section, content in self.report_sections.items(): if content is not None: latest_section = section latest_content = content if latest_section and latest_content: # Format the current section for display section_titles = { "market_report": "Market Analysis", "sentiment_report": "Social Sentiment", "news_report": "News Analysis", "fundamentals_report": "Fundamentals Analysis", "investment_plan": "Research Team Decision", "trader_investment_plan": "Trading Team Plan", "final_trade_decision": "Portfolio Management Decision", } self.current_report = ( f"### {section_titles[latest_section]}\n{latest_content}" ) # Update the final complete report self._update_final_report() def _update_final_report(self): report_parts = [] # Analyst Team Reports if any( self.report_sections[section] for section in [ "market_report", "sentiment_report", "news_report", "fundamentals_report", ] ): report_parts.append("## Analyst Team Reports") if self.report_sections["market_report"]: report_parts.append( f"### Market Analysis\n{self.report_sections['market_report']}" ) if self.report_sections["sentiment_report"]: report_parts.append( f"### Social Sentiment\n{self.report_sections['sentiment_report']}" ) if self.report_sections["news_report"]: report_parts.append( f"### News Analysis\n{self.report_sections['news_report']}" ) if self.report_sections["fundamentals_report"]: report_parts.append( f"### Fundamentals Analysis\n{self.report_sections['fundamentals_report']}" ) # Research Team Reports if self.report_sections["investment_plan"]: report_parts.append("## Research Team Decision") report_parts.append(f"{self.report_sections['investment_plan']}") # Trading Team Reports if self.report_sections["trader_investment_plan"]: report_parts.append("## Trading Team Plan") report_parts.append(f"{self.report_sections['trader_investment_plan']}") # Portfolio Management Decision if self.report_sections["final_trade_decision"]: report_parts.append("## Portfolio Management Decision") report_parts.append(f"{self.report_sections['final_trade_decision']}") self.final_report = "\n\n".join(report_parts) if report_parts else None message_buffer = MessageBuffer() def create_layout(): layout = Layout() layout.split_column( Layout(name="header", size=3), Layout(name="main"), Layout(name="footer", size=3), ) layout["main"].split_column( Layout(name="upper", ratio=3), Layout(name="analysis", ratio=5) ) layout["upper"].split_row( Layout(name="progress", ratio=2), Layout(name="messages", ratio=3) ) return layout def update_display(layout, spinner_text=None): # Header with welcome message layout["header"].update( Panel( "[bold green]Welcome to TradingAgents CLI[/bold green]\n" "[dim]© [Tauric Research](https://github.com/TauricResearch)[/dim]", title="Welcome to TradingAgents", border_style="green", padding=(1, 2), expand=True, ) ) # Progress panel showing agent status progress_table = Table( show_header=True, header_style="bold magenta", show_footer=False, box=box.SIMPLE_HEAD, # Use simple header with horizontal lines title=None, # Remove the redundant Progress title padding=(0, 2), # Add horizontal padding expand=True, # Make table expand to fill available space ) progress_table.add_column("Team", style="cyan", justify="center", width=20) progress_table.add_column("Agent", style="green", justify="center", width=20) progress_table.add_column("Status", style="yellow", justify="center", width=20) # Group agents by team teams = { "Analyst Team": [ "Market Analyst", "Social Analyst", "News Analyst", "Fundamentals Analyst", ], "Research Team": ["Bull Researcher", "Bear Researcher", "Research Manager"], "Trading Team": ["Trader"], "Risk Management": ["Risky Analyst", "Neutral Analyst", "Safe Analyst"], "Portfolio Management": ["Portfolio Manager"], } for team, agents in teams.items(): # Add first agent with team name first_agent = agents[0] status = message_buffer.agent_status[first_agent] if status == "in_progress": spinner = Spinner( "dots", text="[blue]in_progress[/blue]", style="bold cyan" ) status_cell = spinner else: status_color = { "pending": "yellow", "completed": "green", "error": "red", }.get(status, "white") status_cell = f"[{status_color}]{status}[/{status_color}]" progress_table.add_row(team, first_agent, status_cell) # Add remaining agents in team for agent in agents[1:]: status = message_buffer.agent_status[agent] if status == "in_progress": spinner = Spinner( "dots", text="[blue]in_progress[/blue]", style="bold cyan" ) status_cell = spinner else: status_color = { "pending": "yellow", "completed": "green", "error": "red", }.get(status, "white") status_cell = f"[{status_color}]{status}[/{status_color}]" progress_table.add_row("", agent, status_cell) # Add horizontal line after each team progress_table.add_row("─" * 20, "─" * 20, "─" * 20, style="dim") layout["progress"].update( Panel(progress_table, title="Progress", border_style="cyan", padding=(1, 2)) ) # Messages panel showing recent messages and tool calls messages_table = Table( show_header=True, header_style="bold magenta", show_footer=False, expand=True, # Make table expand to fill available space box=box.MINIMAL, # Use minimal box style for a lighter look show_lines=True, # Keep horizontal lines padding=(0, 1), # Add some padding between columns ) messages_table.add_column("Time", style="cyan", width=8, justify="center") messages_table.add_column("Type", style="green", width=10, justify="center") messages_table.add_column( "Content", style="white", no_wrap=False, ratio=1 ) # Make content column expand # Combine tool calls and messages all_messages = [] # Add tool calls for timestamp, tool_name, args in message_buffer.tool_calls: # Truncate tool call args if too long if isinstance(args, str) and len(args) > 100: args = args[:97] + "..." all_messages.append((timestamp, "Tool", f"{tool_name}: {args}")) # Add regular messages for timestamp, msg_type, content in message_buffer.messages: # Convert content to string if it's not already content_str = content if isinstance(content, list): # Handle list of content blocks (Anthropic format) text_parts = [] for item in content: if isinstance(item, dict): if item.get('type') == 'text': text_parts.append(item.get('text', '')) elif item.get('type') == 'tool_use': text_parts.append(f"[Tool: {item.get('name', 'unknown')}]") else: text_parts.append(str(item)) content_str = ' '.join(text_parts) elif not isinstance(content_str, str): content_str = str(content) # Truncate message content if too long if len(content_str) > 200: content_str = content_str[:197] + "..." all_messages.append((timestamp, msg_type, content_str)) # Sort by timestamp all_messages.sort(key=lambda x: x[0]) # Calculate how many messages we can show based on available space # Start with a reasonable number and adjust based on content length max_messages = 12 # Increased from 8 to better fill the space # Get the last N messages that will fit in the panel recent_messages = all_messages[-max_messages:] # Add messages to table for timestamp, msg_type, content in recent_messages: # Format content with word wrapping wrapped_content = Text(content, overflow="fold") messages_table.add_row(timestamp, msg_type, wrapped_content) if spinner_text: messages_table.add_row("", "Spinner", spinner_text) # Add a footer to indicate if messages were truncated if len(all_messages) > max_messages: messages_table.footer = ( f"[dim]Showing last {max_messages} of {len(all_messages)} messages[/dim]" ) layout["messages"].update( Panel( messages_table, title="Messages & Tools", border_style="blue", padding=(1, 2), ) ) # Analysis panel showing current report if message_buffer.current_report: layout["analysis"].update( Panel( Markdown(message_buffer.current_report), title="Current Report", border_style="green", padding=(1, 2), ) ) else: layout["analysis"].update( Panel( "[italic]Waiting for analysis report...[/italic]", title="Current Report", border_style="green", padding=(1, 2), ) ) # Footer with statistics tool_calls_count = len(message_buffer.tool_calls) llm_calls_count = sum( 1 for _, msg_type, _ in message_buffer.messages if msg_type == "Reasoning" ) reports_count = sum( 1 for content in message_buffer.report_sections.values() if content is not None ) stats_table = Table(show_header=False, box=None, padding=(0, 2), expand=True) stats_table.add_column("Stats", justify="center") stats_table.add_row( f"Tool Calls: {tool_calls_count} | LLM Calls: {llm_calls_count} | Generated Reports: {reports_count}" ) layout["footer"].update(Panel(stats_table, border_style="grey50")) def get_user_selections(): """Get all user selections before starting the analysis display.""" # Display ASCII art welcome message with open("./cli/static/welcome.txt", "r") as f: welcome_ascii = f.read() # Create welcome box content welcome_content = f"{welcome_ascii}\n" welcome_content += "[bold green]TradingAgents: Multi-Agents LLM Financial Trading Framework - CLI[/bold green]\n\n" welcome_content += "[bold]Workflow Steps:[/bold]\n" welcome_content += "I. Analyst Team → II. Research Team → III. Trader → IV. Risk Management → V. Portfolio Management\n\n" welcome_content += ( "[dim]Built by [Tauric Research](https://github.com/TauricResearch)[/dim]" ) # Create and center the welcome box welcome_box = Panel( welcome_content, border_style="green", padding=(1, 2), title="Welcome to TradingAgents", subtitle="Multi-Agents LLM Financial Trading Framework", ) console.print(Align.center(welcome_box)) console.print() # Add a blank line after the welcome box # Create a boxed questionnaire for each step def create_question_box(title, prompt, default=None): box_content = f"[bold]{title}[/bold]\n" box_content += f"[dim]{prompt}[/dim]" if default: box_content += f"\n[dim]Default: {default}[/dim]" return Panel(box_content, border_style="blue", padding=(1, 2)) # Step 1: Ticker symbol console.print( create_question_box( "Step 1: Ticker Symbol", "Enter the ticker symbol to analyze", "SPY" ) ) selected_ticker = get_ticker() # Step 2: Analysis date default_date = datetime.datetime.now().strftime("%Y-%m-%d") console.print( create_question_box( "Step 2: Analysis Date", "Enter the analysis date (YYYY-MM-DD)", default_date, ) ) analysis_date = get_analysis_date() # Step 3: Select analysts console.print( create_question_box( "Step 3: Analysts Team", "Select your LLM analyst agents for the analysis" ) ) selected_analysts = select_analysts() console.print( f"[green]Selected analysts:[/green] {', '.join(analyst.value for analyst in selected_analysts)}" ) # Step 4: Research depth console.print( create_question_box( "Step 4: Research Depth", "Select your research depth level" ) ) selected_research_depth = select_research_depth() # Step 5: OpenAI backend console.print( create_question_box( "Step 5: OpenAI backend", "Select which service to talk to" ) ) selected_llm_provider, backend_url = select_llm_provider() # Step 6: Thinking agents console.print( create_question_box( "Step 6: Thinking Agents", "Select your thinking agents for analysis" ) ) selected_shallow_thinker = select_shallow_thinking_agent(selected_llm_provider) selected_deep_thinker = select_deep_thinking_agent(selected_llm_provider) return { "ticker": selected_ticker, "analysis_date": analysis_date, "analysts": selected_analysts, "research_depth": selected_research_depth, "llm_provider": selected_llm_provider.lower(), "backend_url": backend_url, "shallow_thinker": selected_shallow_thinker, "deep_thinker": selected_deep_thinker, } def get_ticker(): """Get ticker symbol from user input.""" return typer.prompt("", default="SPY") def get_analysis_date(): """Get the analysis date from user input.""" while True: date_str = typer.prompt( "", default=datetime.datetime.now().strftime("%Y-%m-%d") ) try: # Validate date format and ensure it's not in the future analysis_date = datetime.datetime.strptime(date_str, "%Y-%m-%d") if analysis_date.date() > datetime.datetime.now().date(): console.print("[red]Error: Analysis date cannot be in the future[/red]") continue return date_str except ValueError: console.print( "[red]Error: Invalid date format. Please use YYYY-MM-DD[/red]" ) def display_complete_report(final_state): """Display the complete analysis report with team-based panels.""" console.print("\n[bold green]Complete Analysis Report[/bold green]\n") # I. Analyst Team Reports analyst_reports = [] # Market Analyst Report if final_state.get("market_report"): analyst_reports.append( Panel( Markdown(final_state["market_report"]), title="Market Analyst", border_style="blue", padding=(1, 2), ) ) # Social Analyst Report if final_state.get("sentiment_report"): analyst_reports.append( Panel( Markdown(final_state["sentiment_report"]), title="Social Analyst", border_style="blue", padding=(1, 2), ) ) # News Analyst Report if final_state.get("news_report"): analyst_reports.append( Panel( Markdown(final_state["news_report"]), title="News Analyst", border_style="blue", padding=(1, 2), ) ) # Fundamentals Analyst Report if final_state.get("fundamentals_report"): analyst_reports.append( Panel( Markdown(final_state["fundamentals_report"]), title="Fundamentals Analyst", border_style="blue", padding=(1, 2), ) ) if analyst_reports: console.print( Panel( Columns(analyst_reports, equal=True, expand=True), title="I. Analyst Team Reports", border_style="cyan", padding=(1, 2), ) ) # II. Research Team Reports if final_state.get("investment_debate_state"): research_reports = [] debate_state = final_state["investment_debate_state"] # Bull Researcher Analysis if debate_state.get("bull_history"): research_reports.append( Panel( Markdown(debate_state["bull_history"]), title="Bull Researcher", border_style="blue", padding=(1, 2), ) ) # Bear Researcher Analysis if debate_state.get("bear_history"): research_reports.append( Panel( Markdown(debate_state["bear_history"]), title="Bear Researcher", border_style="blue", padding=(1, 2), ) ) # Research Manager Decision if debate_state.get("judge_decision"): research_reports.append( Panel( Markdown(debate_state["judge_decision"]), title="Research Manager", border_style="blue", padding=(1, 2), ) ) if research_reports: console.print( Panel( Columns(research_reports, equal=True, expand=True), title="II. Research Team Decision", border_style="magenta", padding=(1, 2), ) ) # III. Trading Team Reports if final_state.get("trader_investment_plan"): console.print( Panel( Panel( Markdown(final_state["trader_investment_plan"]), title="Trader", border_style="blue", padding=(1, 2), ), title="III. Trading Team Plan", border_style="yellow", padding=(1, 2), ) ) # IV. Risk Management Team Reports if final_state.get("risk_debate_state"): risk_reports = [] risk_state = final_state["risk_debate_state"] # Aggressive (Risky) Analyst Analysis if risk_state.get("risky_history"): risk_reports.append( Panel( Markdown(risk_state["risky_history"]), title="Aggressive Analyst", border_style="blue", padding=(1, 2), ) ) # Conservative (Safe) Analyst Analysis if risk_state.get("safe_history"): risk_reports.append( Panel( Markdown(risk_state["safe_history"]), title="Conservative Analyst", border_style="blue", padding=(1, 2), ) ) # Neutral Analyst Analysis if risk_state.get("neutral_history"): risk_reports.append( Panel( Markdown(risk_state["neutral_history"]), title="Neutral Analyst", border_style="blue", padding=(1, 2), ) ) if risk_reports: console.print( Panel( Columns(risk_reports, equal=True, expand=True), title="IV. Risk Management Team Decision", border_style="red", padding=(1, 2), ) ) # V. Portfolio Manager Decision if risk_state.get("judge_decision"): console.print( Panel( Panel( Markdown(risk_state["judge_decision"]), title="Portfolio Manager", border_style="blue", padding=(1, 2), ), title="V. Portfolio Manager Decision", border_style="green", padding=(1, 2), ) ) def update_research_team_status(status): """Update status for all research team members and trader.""" research_team = ["Bull Researcher", "Bear Researcher", "Research Manager", "Trader"] for agent in research_team: message_buffer.update_agent_status(agent, status) def extract_content_string(content): """Extract string content from various message formats.""" if isinstance(content, str): return content elif isinstance(content, list): # Handle Anthropic's list format text_parts = [] for item in content: if isinstance(item, dict): if item.get('type') == 'text': text_parts.append(item.get('text', '')) elif item.get('type') == 'tool_use': text_parts.append(f"[Tool: {item.get('name', 'unknown')}]") else: text_parts.append(str(item)) return ' '.join(text_parts) else: return str(content) def run_analysis(): # First get all user selections selections = get_user_selections() # Create config with selected research depth config = DEFAULT_CONFIG.copy() config["max_debate_rounds"] = selections["research_depth"] config["max_risk_discuss_rounds"] = selections["research_depth"] config["quick_think_llm"] = selections["shallow_thinker"] config["deep_think_llm"] = selections["deep_thinker"] config["backend_url"] = selections["backend_url"] config["llm_provider"] = selections["llm_provider"].lower() # Initialize the graph graph = TradingAgentsGraph( [analyst.value for analyst in selections["analysts"]], config=config, debug=True ) # Create result directory results_dir = Path(config["results_dir"]) / selections["ticker"] / selections["analysis_date"] results_dir.mkdir(parents=True, exist_ok=True) report_dir = results_dir / "reports" report_dir.mkdir(parents=True, exist_ok=True) log_file = results_dir / "message_tool.log" log_file.touch(exist_ok=True) def save_message_decorator(obj, func_name): func = getattr(obj, func_name) @wraps(func) def wrapper(*args, **kwargs): func(*args, **kwargs) timestamp, message_type, content = obj.messages[-1] content = content.replace("\n", " ") # Replace newlines with spaces with open(log_file, "a") as f: f.write(f"{timestamp} [{message_type}] {content}\n") return wrapper def save_tool_call_decorator(obj, func_name): func = getattr(obj, func_name) @wraps(func) def wrapper(*args, **kwargs): func(*args, **kwargs) timestamp, tool_name, args = obj.tool_calls[-1] args_str = ", ".join(f"{k}={v}" for k, v in args.items()) with open(log_file, "a") as f: f.write(f"{timestamp} [Tool Call] {tool_name}({args_str})\n") return wrapper def save_report_section_decorator(obj, func_name): func = getattr(obj, func_name) @wraps(func) def wrapper(section_name, content): func(section_name, content) if section_name in obj.report_sections and obj.report_sections[section_name] is not None: content = obj.report_sections[section_name] if content: file_name = f"{section_name}.md" with open(report_dir / file_name, "w") as f: f.write(content) return wrapper message_buffer.add_message = save_message_decorator(message_buffer, "add_message") message_buffer.add_tool_call = save_tool_call_decorator(message_buffer, "add_tool_call") message_buffer.update_report_section = save_report_section_decorator(message_buffer, "update_report_section") # Now start the display layout layout = create_layout() with Live(layout, refresh_per_second=4) as live: # Initial display update_display(layout) # Add initial messages message_buffer.add_message("System", f"Selected ticker: {selections['ticker']}") message_buffer.add_message( "System", f"Analysis date: {selections['analysis_date']}" ) message_buffer.add_message( "System", f"Selected analysts: {', '.join(analyst.value for analyst in selections['analysts'])}", ) update_display(layout) # Reset agent statuses for agent in message_buffer.agent_status: message_buffer.update_agent_status(agent, "pending") # Reset report sections for section in message_buffer.report_sections: message_buffer.report_sections[section] = None message_buffer.current_report = None message_buffer.final_report = None # Update agent status to in_progress for the first analyst first_analyst = f"{selections['analysts'][0].value.capitalize()} Analyst" message_buffer.update_agent_status(first_analyst, "in_progress") update_display(layout) # Create spinner text spinner_text = ( f"Analyzing {selections['ticker']} on {selections['analysis_date']}..." ) update_display(layout, spinner_text) # Initialize state and get graph args init_agent_state = graph.propagator.create_initial_state( selections["ticker"], selections["analysis_date"] ) args = graph.propagator.get_graph_args() # Stream the analysis trace = [] for chunk in graph.graph.stream(init_agent_state, **args): # Check for new messages and process them if chunk.get("messages") and (not trace or len(chunk["messages"]) > len(trace[-1].get("messages", []))): last_message = chunk["messages"][-1] # Extract message content and type if hasattr(last_message, "content"): content = extract_content_string(last_message.content) msg_type = "Reasoning" else: content = str(last_message) msg_type = "System" message_buffer.add_message(msg_type, content) if hasattr(last_message, "tool_calls") and last_message.tool_calls: for tool_call in last_message.tool_calls: if isinstance(tool_call, dict): message_buffer.add_tool_call(tool_call["name"], tool_call["args"]) else: message_buffer.add_tool_call(tool_call.name, tool_call.args) # Helper to check if a key was updated in the current chunk def was_updated(key): if key not in chunk or not chunk[key]: return False # Check if the value is new by comparing with the last trace if not trace or chunk[key] != trace[-1].get(key): return True return False # Process report and status updates from any chunk if was_updated("market_report"): message_buffer.update_report_section("market_report", chunk["market_report"]) message_buffer.update_agent_status("Market Analyst", "completed") if "social" in selections["analysts"]: message_buffer.update_agent_status("Social Analyst", "in_progress") if was_updated("sentiment_report"): message_buffer.update_report_section("sentiment_report", chunk["sentiment_report"]) message_buffer.update_agent_status("Social Analyst", "completed") if "news" in selections["analysts"]: message_buffer.update_agent_status("News Analyst", "in_progress") if was_updated("news_report"): message_buffer.update_report_section("news_report", chunk["news_report"]) message_buffer.update_agent_status("News Analyst", "completed") if "fundamentals" in selections["analysts"]: message_buffer.update_agent_status("Fundamentals Analyst", "in_progress") if was_updated("fundamentals_report"): message_buffer.update_report_section("fundamentals_report", chunk["fundamentals_report"]) message_buffer.update_agent_status("Fundamentals Analyst", "completed") update_research_team_status("in_progress") if was_updated("investment_plan"): message_buffer.update_report_section("investment_plan", chunk["investment_plan"]) update_research_team_status("completed") message_buffer.update_agent_status("Trader", "in_progress") if was_updated("trader_investment_plan"): message_buffer.update_report_section("trader_investment_plan", chunk["trader_investment_plan"]) message_buffer.update_agent_status("Trader", "completed") message_buffer.update_agent_status("Risky Analyst", "in_progress") message_buffer.update_agent_status("Neutral Analyst", "in_progress") message_buffer.update_agent_status("Safe Analyst", "in_progress") if was_updated("final_trade_decision"): message_buffer.update_report_section("final_trade_decision", chunk["final_trade_decision"]) message_buffer.update_agent_status("Risky Analyst", "completed") message_buffer.update_agent_status("Neutral Analyst", "completed") message_buffer.update_agent_status("Safe Analyst", "completed") message_buffer.update_agent_status("Portfolio Manager", "completed") # Always update the display to reflect any change update_display(layout) trace.append(chunk) # Get final state and decision final_state = trace[-1] decision = graph.process_signal(final_state["final_trade_decision"]) # Update all agent statuses to completed for agent in message_buffer.agent_status: message_buffer.update_agent_status(agent, "completed") message_buffer.add_message( "Analysis", f"Completed analysis for {selections['analysis_date']}" ) # Update final report sections for section in message_buffer.report_sections.keys(): if section in final_state: message_buffer.update_report_section(section, final_state[section]) # Display the complete final report display_complete_report(final_state) update_display(layout) @app.command() def analyze(): run_analysis() if __name__ == "__main__": app()