### Added

- **Dynamic Parameter Tuning (The Learning Loop)**: Implemented full self-reflection cycle. The Reflector agent now parses its own advice into JSON (`rsi_period`, `stop_loss_pct`), persists it to `data_cache/runtime_config.json`, and the Market Analyst loads it to tune the Regime Detector in real-time.
- **Audit Archival**: Every tuning event is now archived to `results/{TICKER}/{DATE}/runtime_config.json` for historical auditing, ensuring we can reproduce why parameters changed on any given day.
- **Atomic Persistence**: Implemented `agent_utils.write_json_atomic` to prevent race conditions during config saves.
- **Centralized Config**: Moved hardcoded paths to `default_config.py` (DRY principle).

### Fixed
- **Reflector Logic Gap**: The Reflector was previously "shouting into the void"—making suggestions but having no mechanism to apply them. This circuit is now closed.
This commit is contained in:
swj.premkumar 2026-01-13 06:40:07 -06:00
parent 1f279a9df2
commit 05ce55125f
8 changed files with 321 additions and 42 deletions

View File

@ -87,9 +87,16 @@ Our framework decomposes complex trading tasks into specialized roles. This ensu
- Continuously evaluates portfolio risk by assessing market volatility, liquidity, and other risk factors. The risk management team evaluates and adjusts trading strategies, providing assessment reports to the Portfolio Manager for final decision.
- The Portfolio Manager approves/rejects the transaction proposal. If approved, the order will be sent to the simulated exchange and executed.
<img src="assets/risk.png" width="70%" style="display: inline-block; margin: 0 2%;">
</
<img src="assets/risk.png" width="70%" style="display: inline-block; margin: 0 2%;">
</p>
### The Learning Loop (New)
- **Reflector Agent:** Reviews every trade outcome. If a strategy fails (e.g., getting stopped out too often), it dynamically tunes system parameters (like `rsi_period` or `stop_loss_pct`) and persists them to `runtime_config.json`.
- **Systematic Evolution:** The system gets smarter with every trade, automatically adapting to changing market regimes without human code intervention.
### Decision Logic & "Safety Valve" Architecture
The following diagram illustrates the complete decision-making flow, including the critical **"Don't Fight the Tape" Safety Valve** which intercepts and overrides `SELL` decisions for hyper-growth stocks in strong trends.
@ -97,45 +104,47 @@ The following diagram illustrates the complete decision-making flow, including t
```mermaid
graph TD
Start([Start Propagate]) --> Init[Initialize State]
Init --> Analysts{Analyst Agents}
Init --> MarketAN[Market Analyst\n(Mandatory Context)]
MarketAN -->|Regime Detected| Router{Select Analysts}
subgraph Data_Collection
Analysts -->|Fetch| Market[Market Analyst]
Analysts -->|Fetch| Social[Social Analyst]
Analysts -->|Fetch| News[News Analyst]
Analysts -->|Fetch| Fund[Fundamentals]
Router -->|Fetch| Social[Social Analyst]
Router -->|Fetch| News[News Analyst]
Router -->|Fetch| Fund[Fundamentals]
end
Market -->|Regime: Trending/Volatile| GraphState
Social -->|Sentiment Score| GraphState
News -->|Macro Signals| GraphState
Fund -->|Growth Metrics| GraphState
Social & News & Fund & MarketAN --> GraphState
GraphState --> Debate{Debate Phase}
Debate -->|Bull Thesis| Bull[Bull Researcher]
Debate -->|Bear Thesis| Bear[Bear Researcher]
Bull --> Trader[Trader Agent]
Bear --> Trader
Bull & Bear --> Trader[Trader Agent]
Trader -->|Propose Trade| Risk[Risk Manager]
Risk -->|Refine/Approve| FinalState[Final Logic State]
FinalState --> SafetyValve{"🛡️ SAFETY VALVE\n(Trend Override)"}
FinalState --> OverrideLogic{HARD GATE LOGIC}
SafetyValve --> Check1["Price > 200 SMA?"]
SafetyValve --> Check2["Revenue Growth > 30%?"]
SafetyValve --> Check3["Regime != BEAR?"]
OverrideLogic -- SELL Signal --> CheckSell{Slope > 0 & Growth > 30%?}
CheckSell -- YES --> BlockSell[🛑 BLOCK SELL (Anti-Short)]
CheckSell -- NO --> AllowSell[✅ Allow SELL]
Check1 & Check2 & Check3 -->|ALL TRUE + Action=SELL| Override([🛑 BLOCK SELL -> FORCE HOLD])
Check1 & Check2 & Check3 -->|ELSE| Pass([✅ Pass Through])
OverrideLogic -- BUY Signal --> CheckBuy{Insiders Selling > $50M?}
CheckBuy -- YES --> BlockBuy[🛑 BLOCK BUY (Anti-Knife)]
CheckBuy -- NO --> AllowBuy[✅ Allow BUY]
Override --> Execution[Execute Order]
Pass --> Execution
BlockSell & AllowSell & BlockBuy & AllowBuy --> PortfolioCheck{Portfolio Check}
style SafetyValve fill:#f96,stroke:#333,stroke-width:2px
style Override fill:#f00,stroke:#fff,stroke-width:2px,color:#fff
style Pass fill:#0f0,stroke:#333,stroke-width:2px
PortfolioCheck -- Active Position --> StopLoss{Unrealized PnL < -10%?}
StopLoss -- YES --> KillSwitch[☠️ FORCE LIQUIDATE (Rule 72)]
StopLoss -- NO --> Execute[Execute Order]
style MarketAN fill:#6f0,stroke:#333,stroke-width:2px
style BlockSell fill:#f00,stroke:#fff,stroke-width:2px,color:#white
style BlockBuy fill:#f00,stroke:#fff,stroke-width:2px,color:#white
style KillSwitch fill:#000,stroke:#f00,stroke-width:2px,color:#fff
```
## Installation and CLI

View File

@ -63,8 +63,23 @@ We judge assets not in a vacuum, but against the Tide.
* If the Market (SPY) is Flat/Choppy and the Asset is Trending Up, this is **Alpha**. We press the advantage.
* If the Market is Up and the Asset is Flat, this is **Weakness**. We cut the laggard.
---
## VI. THE LEARNING LOOP (Dynamic Parameter Tuning)
We do not just execute; we adapt. The system includes a **Self-Reflection Mechanism** that reviews past performance and tunes internal parameters.
1. **Reflection:** After every decision, the `Reflector` analyzes the outcome (Returns vs. Logic).
2. **Tuning:** If the strategy was too slow (lagging) or too fast (whipsawed), the Reflector adjusts core parameters:
* `rsi_period`: Lowered for faster reaction in Volatile markets.
* `risk_multiplier`: Capped during drawdowns.
* `stop_loss_pct`: Tightened if losses exceed projections.
3. **Persistence:** These "Lessons" are saved to `runtime_config.json` and applied to **All Future Decisions**.
---
## V. EXECUTION DISCIPLINE
1. **Binary Thinking is the Enemy:** Rarely is the answer "Sell 100%" or "Buy 100%." We scale out of risks and scale into strength.

View File

@ -0,0 +1,111 @@
import unittest
import json
import os
import shutil
from unittest.mock import MagicMock
# We will let the real imports happen. If venv is used, they should succeed.
from tradingagents.graph.reflection import Reflector
from tradingagents.engines.regime_detector import DynamicIndicatorSelector, MarketRegime
class TestReflectionLoop(unittest.TestCase):
def setUp(self):
# Create a dummy Reflector (mocking LLM only)
self.mock_llm = MagicMock()
self.reflector = Reflector(self.mock_llm)
# Clean up any existing config
if os.path.exists("data_cache/runtime_config.json"):
os.remove("data_cache/runtime_config.json")
def tearDown(self):
# Clean up
if os.path.exists("data_cache/runtime_config.json"):
os.remove("data_cache/runtime_config.json")
def test_json_parsing(self):
"""Test if Reflector correctly parses JSON updates from LLM text."""
llm_response = """
Analysis: The strategy was too slow.
Recommend: Lower RSI period.
```json
{
"UPDATE_PARAMETERS": {
"rsi_period": 7,
"stop_loss_pct": 0.05
}
}
```
"""
updates = self.reflector._parse_parameter_updates(llm_response)
self.assertEqual(updates["rsi_period"], 7)
self.assertEqual(updates["stop_loss_pct"], 0.05)
print("✅ JSON Parsing Test Passed")
def test_persistence_and_loading(self):
"""Test if parameter updates are saved and then loaded by RegimeDetector logic."""
updates = {"rsi_period": 99, "bollinger_period": 5}
# 1. Apply Updates (Simulate Reflector Action)
self.reflector._apply_parameter_updates(updates)
# Verify file exists
self.assertTrue(os.path.exists("data_cache/runtime_config.json"))
# 2. Simulate Market Analyst Loading Logic
loaded_overrides = {}
with open("data_cache/runtime_config.json", 'r') as f:
loaded_overrides = json.load(f)
self.assertEqual(loaded_overrides["rsi_period"], 99)
# 3. Test Component Overrides (Regime Detector)
# Get defaults for TRENDING_UP
defaults = DynamicIndicatorSelector.get_optimal_parameters(MarketRegime.TRENDING_UP)
self.assertEqual(defaults["rsi_period"], 14) # Standard default matches logic
# Get with overrides
tuned = DynamicIndicatorSelector.get_optimal_parameters(MarketRegime.TRENDING_UP, overrides=loaded_overrides)
self.assertEqual(tuned["rsi_period"], 99) # Should be overridden
self.assertEqual(tuned["bollinger_period"], 5)
self.assertEqual(tuned["macd_fast"], 12) # Should remain default
print("✅ Persistence and Integration Test Passed")
def test_archival(self):
"""Test if parameter updates are archived to results/TICKER/DATE."""
updates = {"rsi_period": 77}
dummy_state = {
"company_of_interest": "TEST_TICKER",
"trade_date": "2024-01-01"
}
# 1. Apply Updates with State
self.reflector._apply_parameter_updates(updates, dummy_state)
# Verify global persistence
self.assertTrue(os.path.exists("data_cache/runtime_config.json"))
# Verify archival
archive_path = "results/TEST_TICKER/2024-01-01/runtime_config.json"
print(f"Checking for archive at {archive_path}")
self.assertTrue(os.path.exists(archive_path))
with open(archive_path, 'r') as f:
data = json.load(f)
self.assertEqual(data["rsi_period"], 77)
print("✅ Archival Logic Test Passed")
def tearDown(self):
# Clean up
if os.path.exists("data_cache/runtime_config.json"):
os.remove("data_cache/runtime_config.json")
if os.path.exists("results/TEST_TICKER"):
shutil.rmtree("results/TEST_TICKER")
if __name__ == '__main__':
unittest.main()

View File

@ -23,7 +23,12 @@ def _calculate_net_insider_flow(raw_data: str) -> float:
if not raw_data or "Error" in raw_data or "No insider" in raw_data:
return 0.0
df = pd.read_csv(StringIO(raw_data), comment='#')
# Robust CSV parsing
try:
df = pd.read_csv(StringIO(raw_data), comment='#')
except:
# Fallback for messy data
df = pd.read_csv(StringIO(raw_data), sep=None, engine='python', comment='#')
# Standardize columns
df.columns = [c.strip().lower() for c in df.columns]
@ -55,14 +60,17 @@ def create_market_analyst(llm):
logger.info(f">>> STARTING MARKET ANALYST for {state.get('company_of_interest')} <<<")
current_date = state["trade_date"]
# Initialize default panic state
# Initialize default state
report = state.get("market_report", "Market Analysis Initialized...")
if report == "Market Analysis failed completely.":
report = "Market Analysis in progress..." # Reset if stuck
regime_val = "UNKNOWN (Fatal Node Failure)"
metrics = {}
broad_market_regime = "UNKNOWN (Initialized)"
net_insider_flow = 0.0
metrics = {"volatility": 0.0}
volatility_score = 0.0
report = "Market Analysis failed completely."
tool_result_message = state["messages"]
try:
@ -157,7 +165,19 @@ def create_market_analyst(llm):
else:
regime_val = str(regime)
optimal_params = DynamicIndicatorSelector.get_optimal_parameters(regime)
# Load Runtime Overrides (Dynamic Parameter Tuning)
overrides = {}
try:
config_path = get_config().get("runtime_config_relative_path", "data_cache/runtime_config.json")
import os
if os.path.exists(config_path):
with open(config_path, 'r') as f:
overrides = json.load(f)
logger.info(f"DYNAMIC TUNING ACTIVE: Loaded overrides: {overrides}")
except Exception as e_conf:
logger.warning(f"Failed to load runtime config: {e_conf}")
optimal_params = DynamicIndicatorSelector.get_optimal_parameters(regime, overrides)
volatility_score = metrics.get("volatility", 0.0)
logger.info(f"SUCCESS: Detected Regime: {regime_val}")
@ -287,8 +307,10 @@ def create_market_analyst(llm):
except Exception as e_fatal:
logger.critical(f"CRITICAL ERROR in Market Analyst Node: {e_fatal}")
regime_val = f"UNKNOWN (Fatal Crash: {str(e_fatal)})"
regime_val = f"UNKNOWN (Fatal Crash: {str(e_fatal)})"
# Only overwrite regime if we completely failed
if "UNKNOWN" in str(regime_val) or regime_val is None:
regime_val = f"UNKNOWN (Fatal Crash: {str(e_fatal)})"
report = f"Market Analyst Node crashed completely: {e_fatal}"
risk_multiplier = 0.5 # Default to conservative on crash

View File

@ -36,4 +36,35 @@ def create_msg_delete():
return delete_messages
import json
import os
import tempfile
from typing import Dict, Any
def write_json_atomic(path: str, data: Dict[str, Any]):
"""
Atomically write JSON data to a file.
1. Writes to a temporary file in the same directory.
2. Renames the temp file to the target path (atomic operation).
"""
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
try:
# Create temp file in the same directory to ensure atomic rename works
with tempfile.NamedTemporaryFile(mode='w', dir=directory, delete=False) as tf:
json.dump(data, tf, indent=4)
temp_path = tf.name
# Atomic rename
os.replace(temp_path, path)
except Exception as e:
# Cleanup if something failed before rename
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
raise e

View File

@ -8,6 +8,7 @@ DEFAULT_CONFIG = {
os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
"dataflows/data_cache",
),
"runtime_config_relative_path": "data_cache/runtime_config.json",
# LLM settings
"llm_provider": "openai",
"deep_think_llm": "gemini-pro",

View File

@ -177,14 +177,20 @@ class DynamicIndicatorSelector:
"""Select optimal indicator parameters based on regime."""
@staticmethod
def get_optimal_parameters(regime: MarketRegime) -> Dict:
def get_optimal_parameters(regime: MarketRegime, overrides: Dict = None) -> Dict:
"""
Get optimal indicator parameters for detected regime.
Returns dict with recommended settings for RSI, MACD, Bollinger, etc.
Applies 'overrides' from runtime_config if provided.
"""
if overrides is None:
overrides = {}
defaults = {}
if regime == MarketRegime.TRENDING_UP or regime == MarketRegime.TRENDING_DOWN:
return {
defaults = {
"rsi_period": 14, # Standard for trending
"macd_fast": 12,
"macd_slow": 26,
@ -197,7 +203,7 @@ class DynamicIndicatorSelector:
}
elif regime == MarketRegime.VOLATILE:
return {
defaults = {
"rsi_period": 7, # Shorter for volatile markets
"macd_fast": 8,
"macd_slow": 17,
@ -210,7 +216,7 @@ class DynamicIndicatorSelector:
}
elif regime == MarketRegime.MEAN_REVERTING:
return {
defaults = {
"rsi_period": 14,
"macd_fast": 12,
"macd_slow": 26,
@ -223,7 +229,7 @@ class DynamicIndicatorSelector:
}
else: # SIDEWAYS
return {
defaults = {
"rsi_period": 21, # Longer to avoid noise
"macd_fast": 12,
"macd_slow": 26,
@ -234,6 +240,15 @@ class DynamicIndicatorSelector:
"strategy": "range_trading",
"rationale": "Sideways market - trade support/resistance levels"
}
# Apply Overrides
if overrides:
for key, val in overrides.items():
if key in defaults:
print(f"🔄 TUNING: Overriding {key} from {defaults[key]} to {val}")
defaults[key] = val
return defaults
# Example usage

View File

@ -1,8 +1,10 @@
# TradingAgents/graph/reflection.py
from typing import Dict, Any
import json
import os
from langchain_openai import ChatOpenAI
from tradingagents.utils.logger import app_logger as logger
from tradingagents.dataflows.config import get_config
from tradingagents.agents.utils.agent_utils import write_json_atomic
class Reflector:
"""Handles reflection on decisions and updating memory."""
@ -11,6 +13,7 @@ class Reflector:
"""Initialize the reflector with an LLM."""
self.quick_thinking_llm = quick_thinking_llm
self.reflection_system_prompt = self._get_reflection_prompt()
self.config_path = get_config().get("runtime_config_relative_path", "data_cache/runtime_config.json")
def _get_reflection_prompt(self) -> str:
"""Get the system prompt for reflection."""
@ -82,8 +85,72 @@ Adhere strictly to these instructions.
return situation_str
def _parse_parameter_updates(self, text: str) -> Dict[str, Any]:
"""Extracts JSON parameter updates from the LLM response."""
try:
if "```json" in text:
# Extract content between code blocks
parts = text.split("```json")
if len(parts) > 1:
json_str = parts[1].split("```")[0].strip()
try:
data = json.loads(json_str)
if "UPDATE_PARAMETERS" in data:
logger.info(f"⚠️ REFLECTION UPDATE: Tuning System Parameters: {data['UPDATE_PARAMETERS']}")
return data["UPDATE_PARAMETERS"]
except json.JSONDecodeError:
logger.debug("DEBUG: Failed to decode JSON in reflection.")
except Exception as e:
logger.warning(f"DEBUG: Failed to parse parameter updates: {e}")
return {}
def _apply_parameter_updates(self, updates: Dict[str, Any], current_state: Dict[str, Any] = None):
"""Persist parameter updates to a runtime config file."""
if not updates:
return
# 1. Save to Global Cache (Active State)
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
current_config = {}
if os.path.exists(self.config_path):
try:
with open(self.config_path, 'r') as f:
current_config = json.load(f)
except Exception as e:
logger.warning(f"WARNING: Failed to read existing config {self.config_path}: {e}")
current_config = {}
for key, value in updates.items():
current_config[key] = value
try:
write_json_atomic(self.config_path, current_config)
logger.info(f"✅ SYSTEM UPDATED: Saved new parameters to {self.config_path}")
except Exception as e:
logger.error(f"ERROR: Failed to write config to {self.config_path}: {e}")
# 2. Archive to Ticker/Date Result Folder (Audit Trail)
if current_state:
try:
ticker = current_state.get("company_of_interest", "UNKNOWN_TICKER")
date = current_state.get("trade_date", "UNKNOWN_DATE")
# Get results dir from environment/config or default
results_base = os.getenv("TRADINGAGENTS_RESULTS_DIR", "./results")
# Construct path: results/TICKER/DATE/runtime_config.json
archive_path = os.path.join(results_base, ticker, date, "runtime_config.json")
# Atomic Write for Archive too
write_json_atomic(archive_path, current_config)
logger.info(f"💾 ARCHIVED: Tuning config saved to {archive_path}")
except Exception as e:
logger.warning(f"Failed to archive config to results folder: {e}")
def _reflect_on_component(
self, component_type: str, report: str, situation: str, returns_losses
self, component_type: str, report: str, situation: str, returns_losses, current_state: Dict[str, Any] = None
) -> str:
"""Generate reflection for a component."""
messages = [
@ -95,6 +162,14 @@ Adhere strictly to these instructions.
]
result = self.quick_thinking_llm.invoke(messages).content
# 🛑 NEW LOGIC: Extract and Apply
try:
updates = self._parse_parameter_updates(result)
self._apply_parameter_updates(updates, current_state)
except Exception as e:
logger.error(f"ERROR: Reflection loop failed to apply updates: {e}")
return result
def reflect_bull_researcher(self, current_state, returns_losses, bull_memory):
@ -103,7 +178,7 @@ Adhere strictly to these instructions.
bull_debate_history = current_state["investment_debate_state"]["bull_history"]
result = self._reflect_on_component(
"BULL", bull_debate_history, situation, returns_losses
"BULL", bull_debate_history, situation, returns_losses, current_state
)
bull_memory.add_situations([(situation, result)])
@ -113,7 +188,7 @@ Adhere strictly to these instructions.
bear_debate_history = current_state["investment_debate_state"]["bear_history"]
result = self._reflect_on_component(
"BEAR", bear_debate_history, situation, returns_losses
"BEAR", bear_debate_history, situation, returns_losses, current_state
)
bear_memory.add_situations([(situation, result)])
@ -123,7 +198,7 @@ Adhere strictly to these instructions.
trader_decision = current_state["trader_investment_plan"]
result = self._reflect_on_component(
"TRADER", trader_decision, situation, returns_losses
"TRADER", trader_decision, situation, returns_losses, current_state
)
trader_memory.add_situations([(situation, result)])
@ -133,7 +208,7 @@ Adhere strictly to these instructions.
judge_decision = current_state["investment_debate_state"]["judge_decision"]
result = self._reflect_on_component(
"INVEST JUDGE", judge_decision, situation, returns_losses
"INVEST JUDGE", judge_decision, situation, returns_losses, current_state
)
invest_judge_memory.add_situations([(situation, result)])
@ -143,6 +218,6 @@ Adhere strictly to these instructions.
judge_decision = current_state["risk_debate_state"]["judge_decision"]
result = self._reflect_on_component(
"RISK JUDGE", judge_decision, situation, returns_losses
"RISK JUDGE", judge_decision, situation, returns_losses, current_state
)
risk_manager_memory.add_situations([(situation, result)])