#!/usr/bin/env python3 """ Validate that all fixes have been properly implemented in the code. This checks the code structure without running the actual graph. """ import os import re import ast def check_file_exists(filepath): """Check if a file exists""" return os.path.exists(filepath) def check_parallel_setup(): """Check if parallel execution is implemented in setup.py""" print("\nšŸ” Checking parallel execution setup...") with open('tradingagents/graph/setup.py', 'r') as f: content = f.read() checks = { "ToolCallTracker class": "class ToolCallTracker" in content, "max_total_calls = 3": "max_total_calls = 3" in content, "_create_dispatcher method": "def _create_dispatcher" in content, "Parallel message channels": all(ch in content for ch in ["market_messages", "social_messages", "news_messages", "fundamentals_messages"]), "_wrap_analyst_for_channel": "_wrap_analyst_for_channel" in content, "_wrap_tool_node_for_channel": "_wrap_tool_node_for_channel" in content, "Risk Dispatcher": "_create_risk_dispatcher" in content, "Risk Aggregator": "_create_risk_aggregator" in content, "Duplicate prevention": "self.completed_reports" in content, "Tool call validation": "can_call_tool" in content, "Parameter deduplication": "_hash_params" in content } all_passed = True for check, passed in checks.items(): status = "āœ…" if passed else "āŒ" print(f" {status} {check}") if not passed: all_passed = False return all_passed def check_propagation_update(): """Check if propagation.py supports parallel channels""" print("\nšŸ” Checking propagation updates...") with open('tradingagents/graph/propagation.py', 'r') as f: content = f.read() checks = { "Parallel message channels": all(ch in content for ch in ["market_messages", "social_messages", "news_messages", "fundamentals_messages"]), "Empty message lists initialization": '[]' in content and 'messages' in content, "Proper debate state init": "InvestDebateState" in content and "RiskDebateState" in content } all_passed = True for check, passed in checks.items(): status = "āœ…" if passed else "āŒ" print(f" {status} {check}") if not passed: all_passed = False return all_passed def check_api_updates(): """Check if API properly handles parallel execution and Bear researcher""" print("\nšŸ” Checking API streaming updates...") with open('api.py', 'r') as f: content = f.read() checks = { "Parallel initial status": 'json.dumps({\'type\': \'agent_status\', \'agent\': \'market\', \'status\': \'in_progress\'})' in content, "Message channel processing": 'message_channels = ["market_messages", "social_messages", "news_messages", "fundamentals_messages"]' in content, "Bear researcher status": 'json.dumps({\'type\': \'agent_status\', \'agent\': \'bear_researcher\', \'status\': \'completed\'})' in content, "Risk analyst status": all(agent in content for agent in ['risk_risky', 'risk_safe', 'risk_neutral']), "Reasoning updates per analyst": 'agent_name = agent_map.get(analyst_type, analyst_type)' in content, "Completion messages": 'āœ… Completing' in content } all_passed = True for check, passed in checks.items(): status = "āœ…" if passed else "āŒ" print(f" {status} {check}") if not passed: all_passed = False return all_passed def check_trading_graph_updates(): """Check if trading_graph.py supports new architecture""" print("\nšŸ” Checking trading graph updates...") with open('tradingagents/graph/trading_graph.py', 'r') as f: content = f.read() checks = { "Logger import": "import logging" in content, "Message keys in tool nodes": 'messages_key=' in content, "Debug mode message channel support": 'message_channels = ["market_messages"' in content, "Proper error handling": "logger.error" in content } all_passed = True for check, passed in checks.items(): status = "āœ…" if passed else "āŒ" print(f" {status} {check}") if not passed: all_passed = False return all_passed def analyze_code_structure(): """Analyze the overall code structure for the fixes""" print("\nšŸ“Š ANALYZING CODE STRUCTURE FOR FIXES") print("="*60) # Check each component results = { "Parallel Setup": check_parallel_setup(), "Propagation Updates": check_propagation_update(), "API Updates": check_api_updates(), "Trading Graph Updates": check_trading_graph_updates() } # Summary print("\nšŸ“‹ SUMMARY") print("-"*40) all_passed = all(results.values()) for component, passed in results.items(): status = "āœ… PASS" if passed else "āŒ FAIL" print(f"{component}: {status}") print("\nšŸŽÆ OVERALL RESULT:") if all_passed: print("āœ… ALL FIXES PROPERLY IMPLEMENTED! šŸŽ‰") print("\nKey fixes verified:") print("1. āœ… Tool call limits (max 3 per analyst) with deduplication") print("2. āœ… Duplicate completion prevention") print("3. āœ… Bear researcher proper status updates") print("4. āœ… Risk analysts parallel execution") print("5. āœ… Proper message channel separation") else: print("āŒ SOME FIXES ARE MISSING OR INCOMPLETE") print("\nPlease review the failed checks above.") return all_passed if __name__ == "__main__": # Run validation success = analyze_code_structure() # Additional manual checks print("\nšŸ“ MANUAL VERIFICATION CHECKLIST:") print("-"*40) print("1. Run the iOS app and verify:") print(" - No more than 3 tool calls per analyst") print(" - No duplicate 'Completing analysis' messages") print(" - Bear researcher shows as completed (not pending)") print(" - Risk analysts show live updates and final reports") print(" - Market analyst shows as finished before researchers start") print("\n2. Expected execution time: 2-3 minutes (vs 5-8 minutes before)") print("\n3. All agents should show clean, linear progress without loops") exit(0 if success else 1)