"""UI display functions for the Litadel CLI using Rich library.""" from rich.panel import Panel from rich.spinner import Spinner from rich.layout import Layout from rich.text import Text from rich.table import Table from rich import box def create_layout(): """Create the main layout structure for the CLI display.""" layout = Layout() layout.split_column( Layout(name="header", size=3), Layout(name="main"), Layout(name="footer", size=3), ) layout["main"].split_column( Layout(name="upper", ratio=3), Layout(name="analysis", ratio=5) ) layout["upper"].split_row( Layout(name="progress", ratio=2), Layout(name="messages", ratio=3) ) return layout def update_display(layout, message_buffer, spinner_text=None): """Update all panels in the display layout with current data.""" # Header with welcome message layout["header"].update( Panel( "[bold green]Welcome to Litadel CLI[/bold green]\n" "[dim]© [Tauric Research](https://github.com/TauricResearch)[/dim]", title="Welcome to Litadel", border_style="green", padding=(1, 2), expand=True, ) ) # Progress panel showing agent status progress_table = Table( show_header=True, header_style="bold magenta", show_footer=False, box=box.SIMPLE_HEAD, # Use simple header with horizontal lines title=None, # Remove the redundant Progress title padding=(0, 2), # Add horizontal padding expand=True, # Make table expand to fill available space ) progress_table.add_column("Team", style="cyan", justify="center", width=20) progress_table.add_column("Agent", style="green", justify="center", width=20) progress_table.add_column("Status", style="yellow", justify="center", width=20) # Group agents by team teams = { "Analyst Team": [ "Market Analyst", "Social Analyst", "News Analyst", "Fundamentals Analyst", ], "Research Team": ["Bull Researcher", "Bear Researcher", "Research Manager"], "Trading Team": ["Trader"], "Risk Management": ["Risky Analyst", "Neutral Analyst", "Safe Analyst"], "Portfolio Management": ["Portfolio Manager"], } for team, agents in teams.items(): # Add first agent with team name first_agent = agents[0] status = message_buffer.agent_status[first_agent] if status == "in_progress": spinner = Spinner( "dots", text="[blue]in_progress[/blue]", style="bold cyan" ) status_cell = spinner else: status_color = { "pending": "yellow", "completed": "green", "error": "red", }.get(status, "white") status_cell = f"[{status_color}]{status}[/{status_color}]" progress_table.add_row(team, first_agent, status_cell) # Add remaining agents in team for agent in agents[1:]: status = message_buffer.agent_status[agent] if status == "in_progress": spinner = Spinner( "dots", text="[blue]in_progress[/blue]", style="bold cyan" ) status_cell = spinner else: status_color = { "pending": "yellow", "completed": "green", "error": "red", }.get(status, "white") status_cell = f"[{status_color}]{status}[/{status_color}]" progress_table.add_row("", agent, status_cell) # Add horizontal line after each team progress_table.add_row("─" * 20, "─" * 20, "─" * 20, style="dim") layout["progress"].update( Panel(progress_table, title="Progress", border_style="cyan", padding=(1, 2)) ) # Messages panel showing recent messages and tool calls messages_table = Table( show_header=True, header_style="bold magenta", show_footer=False, expand=True, # Make table expand to fill available space box=box.MINIMAL, # Use minimal box style for a lighter look show_lines=True, # Keep horizontal lines padding=(0, 1), # Add some padding between columns ) messages_table.add_column("Time", style="cyan", width=8, justify="center") messages_table.add_column("Type", style="green", width=10, justify="center") messages_table.add_column( "Content", style="white", no_wrap=False, ratio=1 ) # Make content column expand # Combine tool calls and messages all_messages = [] # Add tool calls for timestamp, tool_name, args in message_buffer.tool_calls: # Truncate tool call args if too long if isinstance(args, str) and len(args) > 100: args = args[:97] + "..." all_messages.append((timestamp, "Tool", f"{tool_name}: {args}")) # Add regular messages for timestamp, msg_type, content in message_buffer.messages: # Convert content to string if it's not already content_str = content if isinstance(content, list): # Handle list of content blocks (Anthropic format) text_parts = [] for item in content: if isinstance(item, dict): if item.get('type') == 'text': text_parts.append(item.get('text', '')) elif item.get('type') == 'tool_use': text_parts.append(f"[Tool: {item.get('name', 'unknown')}]") else: text_parts.append(str(item)) content_str = ' '.join(text_parts) elif not isinstance(content_str, str): content_str = str(content) # Truncate message content if too long if len(content_str) > 200: content_str = content_str[:197] + "..." all_messages.append((timestamp, msg_type, content_str)) # Sort by timestamp all_messages.sort(key=lambda x: x[0]) # Calculate how many messages we can show based on available space # Start with a reasonable number and adjust based on content length max_messages = 12 # Increased from 8 to better fill the space # Get the last N messages that will fit in the panel recent_messages = all_messages[-max_messages:] # Add messages to table for timestamp, msg_type, content in recent_messages: # Format content with word wrapping wrapped_content = Text(content, overflow="fold") messages_table.add_row(timestamp, msg_type, wrapped_content) if spinner_text: messages_table.add_row("", "Spinner", spinner_text) # Add a footer to indicate if messages were truncated if len(all_messages) > max_messages: messages_table.footer = ( f"[dim]Showing last {max_messages} of {len(all_messages)} messages[/dim]" ) layout["messages"].update( Panel( messages_table, title="Messages & Tools", border_style="blue", padding=(1, 2), ) ) # Analysis panel showing current report if message_buffer.current_report: from rich.markdown import Markdown layout["analysis"].update( Panel( Markdown(message_buffer.current_report), title="Current Report", border_style="green", padding=(1, 2), ) ) else: layout["analysis"].update( Panel( "[italic]Waiting for analysis report...[/italic]", title="Current Report", border_style="green", padding=(1, 2), ) ) # Footer with statistics tool_calls_count = len(message_buffer.tool_calls) llm_calls_count = sum( 1 for _, msg_type, _ in message_buffer.messages if msg_type == "Reasoning" ) reports_count = sum( 1 for content in message_buffer.report_sections.values() if content is not None ) stats_table = Table(show_header=False, box=None, padding=(0, 2), expand=True) stats_table.add_column("Stats", justify="center") stats_table.add_row( f"Tool Calls: {tool_calls_count} | LLM Calls: {llm_calls_count} | Generated Reports: {reports_count}" ) layout["footer"].update(Panel(stats_table, border_style="grey50"))