From 05ce55125fd497c8b18bffec352edbd77c0e79d8 Mon Sep 17 00:00:00 2001 From: "swj.premkumar" Date: Tue, 13 Jan 2026 06:40:07 -0600 Subject: [PATCH] ### Added - **Dynamic Parameter Tuning (The Learning Loop)**: Implemented full self-reflection cycle. The Reflector agent now parses its own advice into JSON (`rsi_period`, `stop_loss_pct`), persists it to `data_cache/runtime_config.json`, and the Market Analyst loads it to tune the Regime Detector in real-time. - **Audit Archival**: Every tuning event is now archived to `results/{TICKER}/{DATE}/runtime_config.json` for historical auditing, ensuring we can reproduce why parameters changed on any given day. - **Atomic Persistence**: Implemented `agent_utils.write_json_atomic` to prevent race conditions during config saves. - **Centralized Config**: Moved hardcoded paths to `default_config.py` (DRY principle). MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Fixed - **Reflector Logic Gap**: The Reflector was previously "shouting into the void"β€”making suggestions but having no mechanism to apply them. This circuit is now closed. --- README.md | 53 +++++---- SYSTEM_RULE_BOOK.md | 15 +++ tests/verify_reflection_loop.py | 111 ++++++++++++++++++ .../agents/analysts/market_analyst.py | 34 +++++- tradingagents/agents/utils/agent_utils.py | 31 +++++ tradingagents/default_config.py | 1 + tradingagents/engines/regime_detector.py | 25 +++- tradingagents/graph/reflection.py | 93 +++++++++++++-- 8 files changed, 321 insertions(+), 42 deletions(-) create mode 100644 tests/verify_reflection_loop.py diff --git a/README.md b/README.md index 45d200af..79c073b2 100644 --- a/README.md +++ b/README.md @@ -87,9 +87,16 @@ Our framework decomposes complex trading tasks into specialized roles. This ensu - Continuously evaluates portfolio risk by assessing market volatility, liquidity, and other risk factors. The risk management team evaluates and adjusts trading strategies, providing assessment reports to the Portfolio Manager for final decision. - The Portfolio Manager approves/rejects the transaction proposal. If approved, the order will be sent to the simulated exchange and executed. + +

+### The Learning Loop (New) +- **Reflector Agent:** Reviews every trade outcome. If a strategy fails (e.g., getting stopped out too often), it dynamically tunes system parameters (like `rsi_period` or `stop_loss_pct`) and persists them to `runtime_config.json`. +- **Systematic Evolution:** The system gets smarter with every trade, automatically adapting to changing market regimes without human code intervention. + + ### Decision Logic & "Safety Valve" Architecture The following diagram illustrates the complete decision-making flow, including the critical **"Don't Fight the Tape" Safety Valve** which intercepts and overrides `SELL` decisions for hyper-growth stocks in strong trends. @@ -97,45 +104,47 @@ The following diagram illustrates the complete decision-making flow, including t ```mermaid graph TD Start([Start Propagate]) --> Init[Initialize State] - Init --> Analysts{Analyst Agents} + Init --> MarketAN[Market Analyst\n(Mandatory Context)] + + MarketAN -->|Regime Detected| Router{Select Analysts} subgraph Data_Collection - Analysts -->|Fetch| Market[Market Analyst] - Analysts -->|Fetch| Social[Social Analyst] - Analysts -->|Fetch| News[News Analyst] - Analysts -->|Fetch| Fund[Fundamentals] + Router -->|Fetch| Social[Social Analyst] + Router -->|Fetch| News[News Analyst] + Router -->|Fetch| Fund[Fundamentals] end - Market -->|Regime: Trending/Volatile| GraphState - Social -->|Sentiment Score| GraphState - News -->|Macro Signals| GraphState - Fund -->|Growth Metrics| GraphState + Social & News & Fund & MarketAN --> GraphState GraphState --> Debate{Debate Phase} Debate -->|Bull Thesis| Bull[Bull Researcher] Debate -->|Bear Thesis| Bear[Bear Researcher] - Bull --> Trader[Trader Agent] - Bear --> Trader + Bull & Bear --> Trader[Trader Agent] Trader -->|Propose Trade| Risk[Risk Manager] Risk -->|Refine/Approve| FinalState[Final Logic State] - FinalState --> SafetyValve{"πŸ›‘οΈ SAFETY VALVE\n(Trend Override)"} + FinalState --> OverrideLogic{HARD GATE LOGIC} - SafetyValve --> Check1["Price > 200 SMA?"] - SafetyValve --> Check2["Revenue Growth > 30%?"] - SafetyValve --> Check3["Regime != BEAR?"] + OverrideLogic -- SELL Signal --> CheckSell{Slope > 0 & Growth > 30%?} + CheckSell -- YES --> BlockSell[πŸ›‘ BLOCK SELL (Anti-Short)] + CheckSell -- NO --> AllowSell[βœ… Allow SELL] - Check1 & Check2 & Check3 -->|ALL TRUE + Action=SELL| Override([πŸ›‘ BLOCK SELL -> FORCE HOLD]) - Check1 & Check2 & Check3 -->|ELSE| Pass([βœ… Pass Through]) + OverrideLogic -- BUY Signal --> CheckBuy{Insiders Selling > $50M?} + CheckBuy -- YES --> BlockBuy[πŸ›‘ BLOCK BUY (Anti-Knife)] + CheckBuy -- NO --> AllowBuy[βœ… Allow BUY] - Override --> Execution[Execute Order] - Pass --> Execution + BlockSell & AllowSell & BlockBuy & AllowBuy --> PortfolioCheck{Portfolio Check} - style SafetyValve fill:#f96,stroke:#333,stroke-width:2px - style Override fill:#f00,stroke:#fff,stroke-width:2px,color:#fff - style Pass fill:#0f0,stroke:#333,stroke-width:2px + PortfolioCheck -- Active Position --> StopLoss{Unrealized PnL < -10%?} + StopLoss -- YES --> KillSwitch[☠️ FORCE LIQUIDATE (Rule 72)] + StopLoss -- NO --> Execute[Execute Order] + + style MarketAN fill:#6f0,stroke:#333,stroke-width:2px + style BlockSell fill:#f00,stroke:#fff,stroke-width:2px,color:#white + style BlockBuy fill:#f00,stroke:#fff,stroke-width:2px,color:#white + style KillSwitch fill:#000,stroke:#f00,stroke-width:2px,color:#fff ``` ## Installation and CLI diff --git a/SYSTEM_RULE_BOOK.md b/SYSTEM_RULE_BOOK.md index 9fd7648c..589677f8 100644 --- a/SYSTEM_RULE_BOOK.md +++ b/SYSTEM_RULE_BOOK.md @@ -63,8 +63,23 @@ We judge assets not in a vacuum, but against the Tide. * If the Market (SPY) is Flat/Choppy and the Asset is Trending Up, this is **Alpha**. We press the advantage. * If the Market is Up and the Asset is Flat, this is **Weakness**. We cut the laggard. + --- +## VI. THE LEARNING LOOP (Dynamic Parameter Tuning) + +We do not just execute; we adapt. The system includes a **Self-Reflection Mechanism** that reviews past performance and tunes internal parameters. + +1. **Reflection:** After every decision, the `Reflector` analyzes the outcome (Returns vs. Logic). +2. **Tuning:** If the strategy was too slow (lagging) or too fast (whipsawed), the Reflector adjusts core parameters: + * `rsi_period`: Lowered for faster reaction in Volatile markets. + * `risk_multiplier`: Capped during drawdowns. + * `stop_loss_pct`: Tightened if losses exceed projections. +3. **Persistence:** These "Lessons" are saved to `runtime_config.json` and applied to **All Future Decisions**. + +--- + + ## V. EXECUTION DISCIPLINE 1. **Binary Thinking is the Enemy:** Rarely is the answer "Sell 100%" or "Buy 100%." We scale out of risks and scale into strength. diff --git a/tests/verify_reflection_loop.py b/tests/verify_reflection_loop.py new file mode 100644 index 00000000..50014436 --- /dev/null +++ b/tests/verify_reflection_loop.py @@ -0,0 +1,111 @@ + +import unittest +import json +import os +import shutil +from unittest.mock import MagicMock + +# We will let the real imports happen. If venv is used, they should succeed. +from tradingagents.graph.reflection import Reflector +from tradingagents.engines.regime_detector import DynamicIndicatorSelector, MarketRegime + +class TestReflectionLoop(unittest.TestCase): + + def setUp(self): + # Create a dummy Reflector (mocking LLM only) + self.mock_llm = MagicMock() + self.reflector = Reflector(self.mock_llm) + + # Clean up any existing config + if os.path.exists("data_cache/runtime_config.json"): + os.remove("data_cache/runtime_config.json") + + def tearDown(self): + # Clean up + if os.path.exists("data_cache/runtime_config.json"): + os.remove("data_cache/runtime_config.json") + + def test_json_parsing(self): + """Test if Reflector correctly parses JSON updates from LLM text.""" + llm_response = """ + Analysis: The strategy was too slow. + Recommend: Lower RSI period. + + ```json + { + "UPDATE_PARAMETERS": { + "rsi_period": 7, + "stop_loss_pct": 0.05 + } + } + ``` + """ + updates = self.reflector._parse_parameter_updates(llm_response) + self.assertEqual(updates["rsi_period"], 7) + self.assertEqual(updates["stop_loss_pct"], 0.05) + print("βœ… JSON Parsing Test Passed") + + def test_persistence_and_loading(self): + """Test if parameter updates are saved and then loaded by RegimeDetector logic.""" + updates = {"rsi_period": 99, "bollinger_period": 5} + + # 1. Apply Updates (Simulate Reflector Action) + self.reflector._apply_parameter_updates(updates) + + # Verify file exists + self.assertTrue(os.path.exists("data_cache/runtime_config.json")) + + # 2. Simulate Market Analyst Loading Logic + loaded_overrides = {} + with open("data_cache/runtime_config.json", 'r') as f: + loaded_overrides = json.load(f) + + self.assertEqual(loaded_overrides["rsi_period"], 99) + + # 3. Test Component Overrides (Regime Detector) + # Get defaults for TRENDING_UP + defaults = DynamicIndicatorSelector.get_optimal_parameters(MarketRegime.TRENDING_UP) + self.assertEqual(defaults["rsi_period"], 14) # Standard default matches logic + + # Get with overrides + tuned = DynamicIndicatorSelector.get_optimal_parameters(MarketRegime.TRENDING_UP, overrides=loaded_overrides) + self.assertEqual(tuned["rsi_period"], 99) # Should be overridden + self.assertEqual(tuned["bollinger_period"], 5) + self.assertEqual(tuned["macd_fast"], 12) # Should remain default + + print("βœ… Persistence and Integration Test Passed") + + def test_archival(self): + """Test if parameter updates are archived to results/TICKER/DATE.""" + updates = {"rsi_period": 77} + dummy_state = { + "company_of_interest": "TEST_TICKER", + "trade_date": "2024-01-01" + } + + # 1. Apply Updates with State + self.reflector._apply_parameter_updates(updates, dummy_state) + + # Verify global persistence + self.assertTrue(os.path.exists("data_cache/runtime_config.json")) + + # Verify archival + archive_path = "results/TEST_TICKER/2024-01-01/runtime_config.json" + print(f"Checking for archive at {archive_path}") + self.assertTrue(os.path.exists(archive_path)) + + with open(archive_path, 'r') as f: + data = json.load(f) + self.assertEqual(data["rsi_period"], 77) + + print("βœ… Archival Logic Test Passed") + + def tearDown(self): + # Clean up + if os.path.exists("data_cache/runtime_config.json"): + os.remove("data_cache/runtime_config.json") + if os.path.exists("results/TEST_TICKER"): + shutil.rmtree("results/TEST_TICKER") + +if __name__ == '__main__': + unittest.main() diff --git a/tradingagents/agents/analysts/market_analyst.py b/tradingagents/agents/analysts/market_analyst.py index 106f1ef3..5dca02c9 100644 --- a/tradingagents/agents/analysts/market_analyst.py +++ b/tradingagents/agents/analysts/market_analyst.py @@ -23,7 +23,12 @@ def _calculate_net_insider_flow(raw_data: str) -> float: if not raw_data or "Error" in raw_data or "No insider" in raw_data: return 0.0 - df = pd.read_csv(StringIO(raw_data), comment='#') + # Robust CSV parsing + try: + df = pd.read_csv(StringIO(raw_data), comment='#') + except: + # Fallback for messy data + df = pd.read_csv(StringIO(raw_data), sep=None, engine='python', comment='#') # Standardize columns df.columns = [c.strip().lower() for c in df.columns] @@ -55,14 +60,17 @@ def create_market_analyst(llm): logger.info(f">>> STARTING MARKET ANALYST for {state.get('company_of_interest')} <<<") current_date = state["trade_date"] - # Initialize default panic state + # Initialize default state + report = state.get("market_report", "Market Analysis Initialized...") + if report == "Market Analysis failed completely.": + report = "Market Analysis in progress..." # Reset if stuck + regime_val = "UNKNOWN (Fatal Node Failure)" metrics = {} broad_market_regime = "UNKNOWN (Initialized)" net_insider_flow = 0.0 metrics = {"volatility": 0.0} volatility_score = 0.0 - report = "Market Analysis failed completely." tool_result_message = state["messages"] try: @@ -157,7 +165,19 @@ def create_market_analyst(llm): else: regime_val = str(regime) - optimal_params = DynamicIndicatorSelector.get_optimal_parameters(regime) + # Load Runtime Overrides (Dynamic Parameter Tuning) + overrides = {} + try: + config_path = get_config().get("runtime_config_relative_path", "data_cache/runtime_config.json") + import os + if os.path.exists(config_path): + with open(config_path, 'r') as f: + overrides = json.load(f) + logger.info(f"DYNAMIC TUNING ACTIVE: Loaded overrides: {overrides}") + except Exception as e_conf: + logger.warning(f"Failed to load runtime config: {e_conf}") + + optimal_params = DynamicIndicatorSelector.get_optimal_parameters(regime, overrides) volatility_score = metrics.get("volatility", 0.0) logger.info(f"SUCCESS: Detected Regime: {regime_val}") @@ -287,8 +307,10 @@ def create_market_analyst(llm): except Exception as e_fatal: logger.critical(f"CRITICAL ERROR in Market Analyst Node: {e_fatal}") - regime_val = f"UNKNOWN (Fatal Crash: {str(e_fatal)})" - regime_val = f"UNKNOWN (Fatal Crash: {str(e_fatal)})" + # Only overwrite regime if we completely failed + if "UNKNOWN" in str(regime_val) or regime_val is None: + regime_val = f"UNKNOWN (Fatal Crash: {str(e_fatal)})" + report = f"Market Analyst Node crashed completely: {e_fatal}" risk_multiplier = 0.5 # Default to conservative on crash diff --git a/tradingagents/agents/utils/agent_utils.py b/tradingagents/agents/utils/agent_utils.py index 6cf294a1..736dc990 100644 --- a/tradingagents/agents/utils/agent_utils.py +++ b/tradingagents/agents/utils/agent_utils.py @@ -36,4 +36,35 @@ def create_msg_delete(): return delete_messages +import json +import os +import tempfile +from typing import Dict, Any + +def write_json_atomic(path: str, data: Dict[str, Any]): + """ + Atomically write JSON data to a file. + + 1. Writes to a temporary file in the same directory. + 2. Renames the temp file to the target path (atomic operation). + """ + directory = os.path.dirname(path) + if not os.path.exists(directory): + os.makedirs(directory, exist_ok=True) + + try: + # Create temp file in the same directory to ensure atomic rename works + with tempfile.NamedTemporaryFile(mode='w', dir=directory, delete=False) as tf: + json.dump(data, tf, indent=4) + temp_path = tf.name + + # Atomic rename + os.replace(temp_path, path) + except Exception as e: + # Cleanup if something failed before rename + if 'temp_path' in locals() and os.path.exists(temp_path): + os.remove(temp_path) + raise e + + \ No newline at end of file diff --git a/tradingagents/default_config.py b/tradingagents/default_config.py index 24093e52..d9c32767 100644 --- a/tradingagents/default_config.py +++ b/tradingagents/default_config.py @@ -8,6 +8,7 @@ DEFAULT_CONFIG = { os.path.abspath(os.path.join(os.path.dirname(__file__), ".")), "dataflows/data_cache", ), + "runtime_config_relative_path": "data_cache/runtime_config.json", # LLM settings "llm_provider": "openai", "deep_think_llm": "gemini-pro", diff --git a/tradingagents/engines/regime_detector.py b/tradingagents/engines/regime_detector.py index 71c3d3f7..1ef4a54b 100644 --- a/tradingagents/engines/regime_detector.py +++ b/tradingagents/engines/regime_detector.py @@ -177,14 +177,20 @@ class DynamicIndicatorSelector: """Select optimal indicator parameters based on regime.""" @staticmethod - def get_optimal_parameters(regime: MarketRegime) -> Dict: + def get_optimal_parameters(regime: MarketRegime, overrides: Dict = None) -> Dict: """ Get optimal indicator parameters for detected regime. Returns dict with recommended settings for RSI, MACD, Bollinger, etc. + Applies 'overrides' from runtime_config if provided. """ + if overrides is None: + overrides = {} + + defaults = {} + if regime == MarketRegime.TRENDING_UP or regime == MarketRegime.TRENDING_DOWN: - return { + defaults = { "rsi_period": 14, # Standard for trending "macd_fast": 12, "macd_slow": 26, @@ -197,7 +203,7 @@ class DynamicIndicatorSelector: } elif regime == MarketRegime.VOLATILE: - return { + defaults = { "rsi_period": 7, # Shorter for volatile markets "macd_fast": 8, "macd_slow": 17, @@ -210,7 +216,7 @@ class DynamicIndicatorSelector: } elif regime == MarketRegime.MEAN_REVERTING: - return { + defaults = { "rsi_period": 14, "macd_fast": 12, "macd_slow": 26, @@ -223,7 +229,7 @@ class DynamicIndicatorSelector: } else: # SIDEWAYS - return { + defaults = { "rsi_period": 21, # Longer to avoid noise "macd_fast": 12, "macd_slow": 26, @@ -234,6 +240,15 @@ class DynamicIndicatorSelector: "strategy": "range_trading", "rationale": "Sideways market - trade support/resistance levels" } + + # Apply Overrides + if overrides: + for key, val in overrides.items(): + if key in defaults: + print(f"πŸ”„ TUNING: Overriding {key} from {defaults[key]} to {val}") + defaults[key] = val + + return defaults # Example usage diff --git a/tradingagents/graph/reflection.py b/tradingagents/graph/reflection.py index 1f4d6f20..ab74f3d3 100644 --- a/tradingagents/graph/reflection.py +++ b/tradingagents/graph/reflection.py @@ -1,8 +1,10 @@ -# TradingAgents/graph/reflection.py - from typing import Dict, Any +import json +import os from langchain_openai import ChatOpenAI - +from tradingagents.utils.logger import app_logger as logger +from tradingagents.dataflows.config import get_config +from tradingagents.agents.utils.agent_utils import write_json_atomic class Reflector: """Handles reflection on decisions and updating memory.""" @@ -11,6 +13,7 @@ class Reflector: """Initialize the reflector with an LLM.""" self.quick_thinking_llm = quick_thinking_llm self.reflection_system_prompt = self._get_reflection_prompt() + self.config_path = get_config().get("runtime_config_relative_path", "data_cache/runtime_config.json") def _get_reflection_prompt(self) -> str: """Get the system prompt for reflection.""" @@ -82,8 +85,72 @@ Adhere strictly to these instructions. return situation_str + def _parse_parameter_updates(self, text: str) -> Dict[str, Any]: + """Extracts JSON parameter updates from the LLM response.""" + try: + if "```json" in text: + # Extract content between code blocks + parts = text.split("```json") + if len(parts) > 1: + json_str = parts[1].split("```")[0].strip() + try: + data = json.loads(json_str) + if "UPDATE_PARAMETERS" in data: + logger.info(f"⚠️ REFLECTION UPDATE: Tuning System Parameters: {data['UPDATE_PARAMETERS']}") + return data["UPDATE_PARAMETERS"] + except json.JSONDecodeError: + logger.debug("DEBUG: Failed to decode JSON in reflection.") + except Exception as e: + logger.warning(f"DEBUG: Failed to parse parameter updates: {e}") + return {} + + def _apply_parameter_updates(self, updates: Dict[str, Any], current_state: Dict[str, Any] = None): + """Persist parameter updates to a runtime config file.""" + if not updates: + return + + # 1. Save to Global Cache (Active State) + os.makedirs(os.path.dirname(self.config_path), exist_ok=True) + + current_config = {} + if os.path.exists(self.config_path): + try: + with open(self.config_path, 'r') as f: + current_config = json.load(f) + except Exception as e: + logger.warning(f"WARNING: Failed to read existing config {self.config_path}: {e}") + current_config = {} + + for key, value in updates.items(): + current_config[key] = value + + try: + write_json_atomic(self.config_path, current_config) + logger.info(f"βœ… SYSTEM UPDATED: Saved new parameters to {self.config_path}") + except Exception as e: + logger.error(f"ERROR: Failed to write config to {self.config_path}: {e}") + + # 2. Archive to Ticker/Date Result Folder (Audit Trail) + if current_state: + try: + ticker = current_state.get("company_of_interest", "UNKNOWN_TICKER") + date = current_state.get("trade_date", "UNKNOWN_DATE") + + # Get results dir from environment/config or default + results_base = os.getenv("TRADINGAGENTS_RESULTS_DIR", "./results") + # Construct path: results/TICKER/DATE/runtime_config.json + archive_path = os.path.join(results_base, ticker, date, "runtime_config.json") + + # Atomic Write for Archive too + write_json_atomic(archive_path, current_config) + + logger.info(f"πŸ’Ύ ARCHIVED: Tuning config saved to {archive_path}") + + except Exception as e: + logger.warning(f"Failed to archive config to results folder: {e}") + def _reflect_on_component( - self, component_type: str, report: str, situation: str, returns_losses + self, component_type: str, report: str, situation: str, returns_losses, current_state: Dict[str, Any] = None ) -> str: """Generate reflection for a component.""" messages = [ @@ -95,6 +162,14 @@ Adhere strictly to these instructions. ] result = self.quick_thinking_llm.invoke(messages).content + + # πŸ›‘ NEW LOGIC: Extract and Apply + try: + updates = self._parse_parameter_updates(result) + self._apply_parameter_updates(updates, current_state) + except Exception as e: + logger.error(f"ERROR: Reflection loop failed to apply updates: {e}") + return result def reflect_bull_researcher(self, current_state, returns_losses, bull_memory): @@ -103,7 +178,7 @@ Adhere strictly to these instructions. bull_debate_history = current_state["investment_debate_state"]["bull_history"] result = self._reflect_on_component( - "BULL", bull_debate_history, situation, returns_losses + "BULL", bull_debate_history, situation, returns_losses, current_state ) bull_memory.add_situations([(situation, result)]) @@ -113,7 +188,7 @@ Adhere strictly to these instructions. bear_debate_history = current_state["investment_debate_state"]["bear_history"] result = self._reflect_on_component( - "BEAR", bear_debate_history, situation, returns_losses + "BEAR", bear_debate_history, situation, returns_losses, current_state ) bear_memory.add_situations([(situation, result)]) @@ -123,7 +198,7 @@ Adhere strictly to these instructions. trader_decision = current_state["trader_investment_plan"] result = self._reflect_on_component( - "TRADER", trader_decision, situation, returns_losses + "TRADER", trader_decision, situation, returns_losses, current_state ) trader_memory.add_situations([(situation, result)]) @@ -133,7 +208,7 @@ Adhere strictly to these instructions. judge_decision = current_state["investment_debate_state"]["judge_decision"] result = self._reflect_on_component( - "INVEST JUDGE", judge_decision, situation, returns_losses + "INVEST JUDGE", judge_decision, situation, returns_losses, current_state ) invest_judge_memory.add_situations([(situation, result)]) @@ -143,6 +218,6 @@ Adhere strictly to these instructions. judge_decision = current_state["risk_debate_state"]["judge_decision"] result = self._reflect_on_component( - "RISK JUDGE", judge_decision, situation, returns_losses + "RISK JUDGE", judge_decision, situation, returns_losses, current_state ) risk_manager_memory.add_situations([(situation, result)])