From 94eb6c3b68ffab51927bf249e2976078d4b1d646 Mon Sep 17 00:00:00 2001 From: Kevin Bruton Date: Mon, 29 Sep 2025 09:58:36 +0200 Subject: [PATCH] fix: issues with communication --- memory_store/chroma.sqlite3 | Bin 163840 -> 163840 bytes test_streaming.py | 61 ++++++++++ webapp/main.py | 214 +++++++++++++++++++----------------- 3 files changed, 176 insertions(+), 99 deletions(-) create mode 100644 test_streaming.py diff --git a/memory_store/chroma.sqlite3 b/memory_store/chroma.sqlite3 index bd325de5dbe8d5bde664a66f37d43bc4879ee4c2..1e75763b362322f3248d26c78f78d1e804b36653 100644 GIT binary patch delta 54 zcmZo@;A&{#njp=nHc`fzQEg+w5`AV_{`|@829`|x`I{XLbop7CWtkZ`n@jB5OY9l9 Km)JA)H~;{S4-P#5 delta 45 zcmZo@;A&{#njp=nI#I@%QFUX&5`AVF{*uY;29}!z9ZdL}OYGZA>>0P0*faGw001z- B4Rrtj diff --git a/test_streaming.py b/test_streaming.py new file mode 100644 index 00000000..368fb389 --- /dev/null +++ b/test_streaming.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +""" +Test script to verify LangGraph streaming behavior +""" +import os +import sys +from datetime import date +from dotenv import load_dotenv + +# Add the project root to the path +sys.path.insert(0, '/Users/kevin.bruton/repo2/TradingAgents') + +# Load environment variables +load_dotenv() + +def test_callback(state): + """Test callback to understand state structure""" + print(f"\nšŸ” CALLBACK RECEIVED:") + print(f" Type: {type(state)}") + print(f" Keys: {list(state.keys()) if isinstance(state, dict) else 'Not a dict'}") + if isinstance(state, dict): + for key, value in state.items(): + if key not in ["__end__", "messages"]: + print(f" {key}: {type(value)} - {'Has content' if value else 'Empty'}") + +def main(): + """Test the TradingAgentsGraph streaming""" + try: + from tradingagents.graph.trading_graph import TradingAgentsGraph + from tradingagents.default_config import DEFAULT_CONFIG + + print("šŸš€ Testing TradingAgentsGraph streaming...") + + # Create a minimal config for testing + config = DEFAULT_CONFIG.copy() + config["llm_provider"] = "openai" + config["quick_think_llm"] = "gpt-3.5-turbo" + config["deep_think_llm"] = "gpt-4" + + # Create graph with debug mode + graph = TradingAgentsGraph(config=config, debug=True) + + print("šŸ“Š Starting propagation with callback...") + + # Test with a simple company + final_state, signal = graph.propagate( + company_name="AAPL", + trade_date=str(date.today()), + on_step_callback=test_callback + ) + + print(f"\nāœ… Propagation completed!") + print(f" Final signal: {signal}") + + except Exception as e: + import traceback + print(f"āŒ Error: {e}") + print(traceback.format_exc()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/webapp/main.py b/webapp/main.py index 3679926d..059e5621 100644 --- a/webapp/main.py +++ b/webapp/main.py @@ -49,117 +49,129 @@ jinja_env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) def update_execution_state(state: Dict[str, Any]): """Callback function to update the app_state based on LangGraph's state.""" + print(f"šŸ“” Callback received state keys: {list(state.keys())}") + with app_state_lock: - # Check if we're still in initialization phase and need to transition to actual execution - if (app_state["execution_tree"] and + # Initialize the root node if needed + if not app_state["execution_tree"] or ( len(app_state["execution_tree"]) == 1 and - app_state["execution_tree"][0]["id"] == "initialization"): - # Replace initialization message with the main execution tree + app_state["execution_tree"][0]["id"] == "initialization" + ): app_state["execution_tree"] = [{ - "id": "root", - "name": f"Trading Analysis for {app_state['company_symbol']}", - "status": "in_progress", - "content": f"Analyzing {app_state['company_symbol']} using multiple trading agents\n\nThe trading analysis pipeline has been successfully initialized and agents are now executing their tasks.", - "children": [], - "timestamp": time.time() - }] - elif not app_state["execution_tree"]: - # Fallback: Initialize the root node if it doesn't exist - app_state["execution_tree"].append({ "id": "root", "name": f"Trading Analysis for {app_state['company_symbol']}", "status": "in_progress", "content": f"Analyzing {app_state['company_symbol']} using multiple trading agents", "children": [], "timestamp": time.time() - }) + }] root_node = app_state["execution_tree"][0] - # Define the expected phases and their order - phase_map = { - "market_analyst": {"name": "Market Analysis", "phase": "data_collection"}, - "social_analyst": {"name": "Social Media Analysis", "phase": "data_collection"}, - "news_analyst": {"name": "News Analysis", "phase": "data_collection"}, - "fundamentals_analyst": {"name": "Fundamental Analysis", "phase": "data_collection"}, - "bull_researcher": {"name": "Bull Case Research", "phase": "research"}, - "bear_researcher": {"name": "Bear Case Research", "phase": "research"}, - "research_manager": {"name": "Research Synthesis", "phase": "research"}, - "trade_planner": {"name": "Trade Planning", "phase": "planning"}, - "trader": {"name": "Trade Execution", "phase": "execution"}, - "risky_analyst": {"name": "Risk Assessment (Aggressive)", "phase": "risk_analysis"}, - "neutral_analyst": {"name": "Risk Assessment (Neutral)", "phase": "risk_analysis"}, - "safe_analyst": {"name": "Risk Assessment (Conservative)", "phase": "risk_analysis"}, - "risk_judge": {"name": "Final Risk Evaluation", "phase": "risk_analysis"} + # Map LangGraph node names to user-friendly display info + node_mapping = { + "Market Analyst": {"name": "šŸ“ˆ Market Analysis", "phase": "data_collection"}, + "Social Analyst": {"name": "šŸ“± Social Media Analysis", "phase": "data_collection"}, + "News Analyst": {"name": "šŸ“° News Analysis", "phase": "data_collection"}, + "Fundamentals Analyst": {"name": "šŸ“Š Fundamental Analysis", "phase": "data_collection"}, + "Bull Researcher": {"name": "šŸ‚ Bull Case Research", "phase": "research"}, + "Bear Researcher": {"name": "🐻 Bear Case Research", "phase": "research"}, + "Research Manager": {"name": "šŸ” Research Synthesis", "phase": "research"}, + "Trade Planner": {"name": "šŸ“‹ Trade Planning", "phase": "planning"}, + "Trader": {"name": "⚔ Trade Execution", "phase": "execution"}, + "Risky Analyst": {"name": "🚨 Risk Assessment (Aggressive)", "phase": "risk_analysis"}, + "Neutral Analyst": {"name": "āš–ļø Risk Assessment (Neutral)", "phase": "risk_analysis"}, + "Safe Analyst": {"name": "šŸ›”ļø Risk Assessment (Conservative)", "phase": "risk_analysis"}, + "Risk Judge": {"name": "āš ļø Final Risk Evaluation", "phase": "risk_analysis"} } - # Find which agent just completed by examining the state - for key, value in state.items(): - if key in ["__end__", "messages"]: - continue - - # Map the key to a more user-friendly name - agent_key = key.lower().replace(" ", "_").replace("_agent", "").replace("_node", "") - if agent_key in phase_map: - phase_info = phase_map[agent_key] - - # Find or create phase category - phase_category = None - for child in root_node["children"]: - if child["id"] == phase_info["phase"]: - phase_category = child - break - - if not phase_category: - phase_names = { - "data_collection": "šŸ“Š Data Collection", - "research": "šŸ” Research & Analysis", - "planning": "šŸ“‹ Trade Planning", - "execution": "⚔ Trade Execution", - "risk_analysis": "āš ļø Risk Management" - } - - phase_category = { - "id": phase_info["phase"], - "name": phase_names.get(phase_info["phase"], phase_info["phase"]), - "status": "in_progress", - "content": f"Phase: {phase_names.get(phase_info['phase'], phase_info['phase'])}", - "children": [], - "timestamp": time.time() - } - root_node["children"].append(phase_category) - - # Check if this specific step already exists - step_exists = False - for step in phase_category["children"]: - if step["name"] == phase_info["name"]: - step["status"] = "completed" - step["content"] = str(value) if value else "Completed successfully" - step_exists = True - break - - if not step_exists: - # Add new step - new_step = { - "id": f"{phase_info['phase']}_{agent_key}_{len(phase_category['children'])}", - "name": phase_info["name"], - "status": "completed", - "content": str(value) if value else "Completed successfully", - "children": [], - "timestamp": time.time() - } - phase_category["children"].append(new_step) - - # Check if phase is complete (simple heuristic) - completed_steps = sum(1 for step in phase_category["children"] if step["status"] == "completed") - if completed_steps >= len(phase_category["children"]): - phase_category["status"] = "completed" - - # Update overall progress based on completed phases - total_phases = len([p for p in phase_map.values()]) - completed_agents = sum(len(child["children"]) for child in root_node["children"] - if child.get("children")) - app_state["overall_progress"] = min(100, int((completed_agents / max(total_phases, 1)) * 100)) + phase_names = { + "data_collection": "šŸ“Š Data Collection", + "research": "šŸ” Research & Analysis", + "planning": "šŸ“‹ Trade Planning", + "execution": "⚔ Trade Execution", + "risk_analysis": "āš ļø Risk Management" + } + + # The state dict contains the current state of all nodes + # We need to determine what has actually been executed + current_step = None + + # LangGraph streams the full state each time, so we need to detect what's new + # Look for populated report fields to determine what has been completed + if state.get("market_report") and not any(child.get("id") == "data_collection_market" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Market Analyst" + elif state.get("sentiment_report") and not any(child.get("id") == "data_collection_social" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Social Analyst" + elif state.get("news_report") and not any(child.get("id") == "data_collection_news" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "News Analyst" + elif state.get("fundamentals_report") and not any(child.get("id") == "data_collection_fundamentals" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Fundamentals Analyst" + elif state.get("investment_debate_state", {}).get("bull_history") and not any(child.get("id") == "research_bull" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Bull Researcher" + elif state.get("investment_debate_state", {}).get("bear_history") and not any(child.get("id") == "research_bear" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Bear Researcher" + elif state.get("investment_debate_state", {}).get("judge_decision") and not any(child.get("id") == "research_manager" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Research Manager" + elif state.get("trader_investment_plan") and not any(child.get("id") == "planning_trade_planner" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Trade Planner" + elif state.get("investment_plan") and not any(child.get("id") == "execution_trader" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Trader" + elif state.get("risk_debate_state", {}).get("risky_history") and not any(child.get("id") == "risk_analysis_risky" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Risky Analyst" + elif state.get("risk_debate_state", {}).get("neutral_history") and not any(child.get("id") == "risk_analysis_neutral" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Neutral Analyst" + elif state.get("risk_debate_state", {}).get("safe_history") and not any(child.get("id") == "risk_analysis_safe" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Safe Analyst" + elif state.get("final_trade_decision") and not any(child.get("id") == "risk_analysis_risk_judge" for phase in root_node["children"] for child in phase.get("children", [])): + current_step = "Risk Judge" + + if current_step and current_step in node_mapping: + print(f"šŸŽÆ Processing step: {current_step}") + node_info = node_mapping[current_step] + phase_id = node_info["phase"] + + # Find or create phase category + phase_category = None + for child in root_node["children"]: + if child["id"] == phase_id: + phase_category = child + break + + if not phase_category: + phase_category = { + "id": phase_id, + "name": phase_names.get(phase_id, phase_id), + "status": "in_progress", + "content": f"Phase: {phase_names.get(phase_id, phase_id)}", + "children": [], + "timestamp": time.time() + } + root_node["children"].append(phase_category) + + # Add new step + step_id = f"{phase_id}_{current_step.lower().replace(' ', '_')}" + new_step = { + "id": step_id, + "name": node_info["name"], + "status": "completed", + "content": f"āœ… {node_info['name']} completed successfully", + "children": [], + "timestamp": time.time() + } + phase_category["children"].append(new_step) + + # Mark phase as completed if it has steps + phase_category["status"] = "completed" + + # Update overall progress + total_steps = len(node_mapping) + completed_steps = sum(len(child["children"]) for child in root_node["children"]) + app_state["overall_progress"] = min(100, int((completed_steps / max(total_steps, 1)) * 100)) + + print(f"šŸ“Š Progress updated: {app_state['overall_progress']}% ({completed_steps}/{total_steps} steps)") + else: + print(f"ā³ No new step detected or step already processed") def run_trading_process(company_symbol: str, config: Dict[str, Any]): """Runs the TradingAgentsGraph in a separate thread.""" @@ -197,10 +209,14 @@ def run_trading_process(company_symbol: str, config: Dict[str, Any]): else: # openai custom_config["backend_url"] = "https://api.openai.com/v1" + print(f"šŸš€ Initializing TradingAgentsGraph for {company_symbol}") graph = TradingAgentsGraph(config=custom_config) analysis_date = config["analysis_date"] # Use user-selected date + print(f"šŸ”„ Starting propagation for {company_symbol} on {analysis_date}") + # The propagate method now accepts the callback and trade_date - final_state = graph.propagate(company_symbol, trade_date=analysis_date, on_step_callback=update_execution_state) + final_state, processed_signal = graph.propagate(company_symbol, trade_date=analysis_date, on_step_callback=update_execution_state) + print(f"āœ… Propagation completed for {company_symbol}") with app_state_lock: app_state["overall_status"] = "completed" @@ -208,7 +224,7 @@ def run_trading_process(company_symbol: str, config: Dict[str, Any]): # Update the root node status to completed if app_state["execution_tree"]: app_state["execution_tree"][0]["status"] = "completed" - app_state["execution_tree"][0]["content"] = str(final_state) + app_state["execution_tree"][0]["content"] = f"āœ… Analysis completed successfully!\n\nFinal Decision: {processed_signal}\n\nFull State: {str(final_state)}" except Exception as e: import traceback