TradingAgents/tradingagents/agents/data_visualizer/data_transformation_agent.py

569 lines
21 KiB
Python

import json
import os
import re
from datetime import datetime
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
import openai
from pathlib import Path
@dataclass
class TransformationConfig:
"""Configuration for the data transformation agent"""
openai_api_key: str = os.environ.get("OPENAI_API_KEY")
model: str = "gpt-4o-mini"
eval_results_path: str = "../output_data/AVAH/TradingAgentsStrategy_logs"
output_path: str = "../output_data/AVAH/TradingAgentsStrategy_transformed_logs"
backend_url: str = "https://api.openai.com/v1"
class DataTransformationAgent:
"""Agent that transforms TradingAgents output into widget-friendly JSON format"""
def __init__(self, config: TransformationConfig):
self.config = config
self.client = openai.OpenAI(
api_key=config.openai_api_key,
base_url=config.backend_url
)
# Ensure output directory exists
os.makedirs(self.config.output_path, exist_ok=True)
def get_transformation_prompt(self) -> str:
"""Returns the comprehensive transformation prompt"""
return """
You are a data transformation specialist. Take the provided investment analysis JSON and restructure it into a widget-friendly format that separates visual data from text content for easy frontend consumption.
## Input Format
The input JSON contains investment analysis data with the following structure:
- `company_of_interest`: Stock ticker
- `trade_date`: Analysis date
- `market_report`: Technical analysis text
- `sentiment_report`: Company sentiment analysis text
- `news_report`: Macroeconomic news text
- `fundamentals_report`: Financial metrics and company data text
- `investment_debate_state`: Object containing bull/bear/neutral arguments
- `risk_debate_state`: Object containing risk analysis discussions
- `investment_plan`: Final investment strategy text
- `trader_investment_decision`: Final decision rationale text
- `final_trade_decision`: Ultimate trade recommendation text
## Output Requirements
Transform the input into a structured JSON with the following sections:
### 1. Widget Data Structure
```json
{
"metadata": {
"company_ticker": "string",
"company_name": "string",
"analysis_date": "YYYY-MM-DD",
"final_recommendation": "BUY|SELL|HOLD",
"confidence_level": "HIGH|MEDIUM|LOW"
},
"financial_data": {
"current_price": number,
"price_change": number,
"price_change_percent": number,
"market_cap": "string",
"enterprise_value": "string",
"shares_outstanding": "string",
"trading_range": {
"high": number,
"low": number,
"open": number
},
"volume": number,
"valuation_ratios": {
"current_ps_ratio": number,
"fair_value_ps_ratio": number,
"forward_pe": number,
"forward_ps": number,
"forward_pcf": number,
"forward_pocf": number
},
"ownership": {
"insider_percent": number,
"institutional_percent": number
},
"analyst_data": {
"consensus_rating": "string",
"price_target": number,
"forecast_price": number
}
},
"technical_indicators": {
"sma_50": number,
"sma_200": number,
"ema_10": number,
"macd": number,
"macd_signal": number,
"rsi": number,
"atr": number,
"trend_directions": {
"sma_50": "BULLISH|BEARISH|NEUTRAL",
"sma_200": "BULLISH|BEARISH|NEUTRAL",
"ema_10": "BULLISH|BEARISH|NEUTRAL",
"macd": "BULLISH|BEARISH|NEUTRAL",
"rsi_condition": "OVERSOLD|OVERBOUGHT|NEUTRAL"
}
},
"investment_strategy": {
"position_sizing": {
"total_allocation_percent": "string",
"entry_strategy": "string",
"tranche_1_percent": "string",
"tranche_2_percent": "string"
},
"risk_management": {
"initial_stop_loss": number,
"stop_loss_percent": number,
"breakeven_strategy": "string"
},
"profit_targets": [
{
"target_price": number,
"action": "string",
"rationale": "string"
}
],
"monitoring_points": [
"string"
]
},
"debate_summary": {
"bull_key_points": [
"string"
],
"bear_key_points": [
"string"
],
"neutral_perspective": "string",
"final_decision_rationale": "string"
},
"text_content": {
"market_report": {
"title": "Technical Analysis Report",
"content": "string",
"key_takeaways": [
"string"
]
},
"sentiment_report": {
"title": "Company Sentiment Analysis",
"content": "string",
"recent_developments": [
"string"
]
},
"fundamentals_report": {
"title": "Fundamental Analysis",
"content": "string",
"financial_highlights": [
"string"
]
},
"news_report": {
"title": "Macroeconomic Context",
"content": "string",
"key_developments": [
{
"date": "string",
"event": "string",
"impact": "string"
}
]
},
"investment_plan_full": {
"title": "Complete Investment Strategy",
"content": "string"
},
"debate_transcripts": {
"bull_analysis": "string",
"bear_analysis": "string",
"neutral_analysis": "string",
"risk_discussion": "string"
}
},
"widgets_config": {
"charts_needed": [
{
"type": "price_chart",
"data_source": "financial_data.current_price",
"timeframe": "30_days"
},
{
"type": "technical_indicators",
"data_source": "technical_indicators"
}
],
"text_widgets": [
{
"type": "expandable_report",
"title": "Technical Analysis",
"content_source": "text_content.market_report"
}
]
}
}
```
## Extraction Instructions
1. **Parse Financial Metrics**: Extract all numerical values from the fundamentals_report, including current price, ratios, market cap, etc.
2. **Extract Technical Data**: Pull technical indicator values and trend directions from the market_report text
3. **Summarize Debates**: Create concise bullet points from the lengthy bull/bear arguments, focusing on key investment themes
4. **Structure Investment Plan**: Break down the investment strategy into actionable components (sizing, entry price,stops, targets, time horizon)
5. **Organize Text Content**: Preserve full text reports while also extracting key highlights for quick reference
6. **Identify Key Dates**: Extract important dates like earnings calls, trade dates, and catalyst events
7. **Classify Sentiment**: Determine overall sentiment scores and confidence levels based on the analysis
## Data Validation
- Ensure all numerical values are properly typed (numbers vs strings)
- Validate date formats are consistent
- Check that all required fields are populated
- Verify that text content is properly escaped for JSON
## Output Optimization
- Structure data for easy consumption by frontend frameworks (React, Vue, Angular)
- Separate frequently-accessed data (current price, recommendation) from detailed reports
- Include metadata for widget configuration and rendering preferences
- Provide fallback values for any missing data points
Transform the input JSON following this structure to create a comprehensive, widget-ready dataset that maintains all original information while making it easily accessible for dashboard creation.
IMPORTANT: Return ONLY the transformed JSON, no additional text or explanations.
"""
def extract_numerical_value(self, text: str, pattern: str, default: float = 0.0) -> float:
"""Extract numerical values from text using regex patterns"""
try:
match = re.search(pattern, text, re.IGNORECASE)
if match:
value_str = match.group(1).replace(',', '').replace('$', '').replace('%', '')
return float(value_str)
except (ValueError, AttributeError):
pass
return default
def transform_single_file(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform a single TradingAgents JSON file using LLM"""
try:
# Prepare the input data as a JSON string
input_json = json.dumps(input_data, indent=2)
# Create the prompt with the input data
full_prompt = f"{self.get_transformation_prompt()}\n\nInput JSON to transform:\n{input_json}"
# Call OpenAI API
response = self.client.chat.completions.create(
model=self.config.model,
messages=[
{"role": "system", "content": "You are a data transformation specialist. Transform the provided JSON exactly as specified."},
{"role": "user", "content": full_prompt}
],
temperature=0.1,
max_tokens=16384
)
# Parse the response
transformed_json_str = response.choices[0].message.content.strip()
# Clean up the response (remove any markdown formatting)
if transformed_json_str.startswith('```json'):
transformed_json_str = transformed_json_str[7:]
if transformed_json_str.endswith('```'):
transformed_json_str = transformed_json_str[:-3]
transformed_data = json.loads(transformed_json_str)
# Add fallback values if transformation missed anything
self._add_fallback_values(transformed_data, input_data)
except Exception as e:
print(f"Error transforming data: {e}")
# Return a basic fallback structure
transformed_data = self._create_fallback_structure(input_data)
return transformed_data
def _add_fallback_values(self, transformed_data: Dict[str, Any], original_data: Dict[str, Any]):
"""Add fallback values for any missing required fields"""
# Ensure metadata exists
if 'metadata' not in transformed_data:
transformed_data['metadata'] = {}
metadata = transformed_data['metadata']
if 'company_ticker' not in metadata:
metadata['company_ticker'] = original_data.get('company_of_interest', 'UNKNOWN')
if 'analysis_date' not in metadata:
metadata['analysis_date'] = original_data.get('trade_date', datetime.now().strftime('%Y-%m-%d'))
if 'final_recommendation' not in metadata:
metadata['final_recommendation'] = 'HOLD'
if 'confidence_level' not in metadata:
metadata['confidence_level'] = 'MEDIUM'
# Ensure all required sections exist
required_sections = [
'financial_data', 'technical_indicators', 'investment_strategy',
'debate_summary', 'text_content', 'widgets_config'
]
for section in required_sections:
if section not in transformed_data:
transformed_data[section] = {}
def _create_fallback_structure(self, original_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a basic fallback structure when transformation fails"""
return {
"metadata": {
"company_ticker": original_data.get('company_of_interest', 'UNKNOWN'),
"company_name": original_data.get('company_of_interest', 'Unknown Company'),
"analysis_date": original_data.get('trade_date', datetime.now().strftime('%Y-%m-%d')),
"final_recommendation": "HOLD",
"confidence_level": "LOW"
},
"financial_data": {
"current_price": 0.0,
"price_change": 0.0,
"price_change_percent": 0.0,
"market_cap": "N/A",
"enterprise_value": "N/A",
"shares_outstanding": "N/A",
"trading_range": {"high": 0.0, "low": 0.0, "open": 0.0},
"volume": 0,
"valuation_ratios": {
"current_ps_ratio": 0.0,
"fair_value_ps_ratio": 0.0,
"forward_pe": 0.0,
"forward_ps": 0.0,
"forward_pcf": 0.0,
"forward_pocf": 0.0
},
"ownership": {"insider_percent": 0.0, "institutional_percent": 0.0},
"analyst_data": {
"consensus_rating": "N/A",
"price_target": 0.0,
"forecast_price": 0.0
}
},
"technical_indicators": {
"sma_50": 0.0,
"sma_200": 0.0,
"ema_10": 0.0,
"macd": 0.0,
"macd_signal": 0.0,
"rsi": 50.0,
"atr": 0.0,
"trend_directions": {
"sma_50": "NEUTRAL",
"sma_200": "NEUTRAL",
"ema_10": "NEUTRAL",
"macd": "NEUTRAL",
"rsi_condition": "NEUTRAL"
}
},
"investment_strategy": {
"position_sizing": {
"total_allocation_percent": "0%",
"entry_strategy": "N/A",
"tranche_1_percent": "0%",
"tranche_2_percent": "0%"
},
"risk_management": {
"initial_stop_loss": 0.0,
"stop_loss_percent": 0.0,
"breakeven_strategy": "N/A"
},
"profit_targets": [],
"monitoring_points": []
},
"debate_summary": {
"bull_key_points": [],
"bear_key_points": [],
"neutral_perspective": "No analysis available",
"final_decision_rationale": "No decision rationale available"
},
"text_content": {
"market_report": {
"title": "Technical Analysis Report",
"content": original_data.get('market_report', 'No market report available'),
"key_takeaways": []
},
"sentiment_report": {
"title": "Company Sentiment Analysis",
"content": original_data.get('sentiment_report', 'No sentiment report available'),
"recent_developments": []
},
"fundamentals_report": {
"title": "Fundamental Analysis",
"content": original_data.get('fundamentals_report', 'No fundamentals report available'),
"financial_highlights": []
},
"news_report": {
"title": "Macroeconomic Context",
"content": original_data.get('news_report', 'No news report available'),
"key_developments": []
},
"investment_plan_full": {
"title": "Complete Investment Strategy",
"content": original_data.get('investment_plan', 'No investment plan available')
},
"debate_transcripts": {
"bull_analysis": "",
"bear_analysis": "",
"neutral_analysis": "",
"risk_discussion": ""
}
},
"widgets_config": {
"charts_needed": [
{"type": "price_chart", "data_source": "financial_data.current_price", "timeframe": "30_days"},
{"type": "technical_indicators", "data_source": "technical_indicators"}
],
"text_widgets": [
{"type": "expandable_report", "title": "Technical Analysis", "content_source": "text_content.market_report"}
]
}
}
def process_all_files(self) -> Dict[str, List[str]]:
"""Process all JSON files in the eval_results directory"""
results = {"success": [], "failed": []}
eval_results_path = Path(self.config.eval_results_path)
if not eval_results_path.exists():
print(f"Eval results path does not exist: {eval_results_path}")
return results
# Process each company directory
for company_dir in eval_results_path.iterdir():
if not company_dir.is_dir():
continue
company_ticker = company_dir.name
logs_dir = company_dir / "TradingAgentsStrategy_logs"
transformed_dir = company_dir / "TradingAgentsStrategy_transformed_logs"
transformed_dir.mkdir(parents=True, exist_ok=True)
# Process each JSON file in the logs directory
for json_file in logs_dir.glob("*.json"):
try:
print(f"Processing {json_file}")
# Process the file
success = self.process_single_file(str(json_file), str(transformed_dir))
if success:
results["success"].append(str(json_file.name))
print(f"Successfully transformed and saved: {json_file.name}")
else:
results["failed"].append(str(json_file))
print(f"Failed to process {json_file}")
except Exception as e:
print(f"Failed to process {json_file}: {e}")
results["failed"].append(str(json_file))
return results
def process_single_file(self, input_file_path: str, output_file_path: str = None) -> bool:
"""Process a single JSON file"""
try:
input_path = Path(input_file_path)
if not input_path.exists():
print(f"Input file does not exist: {input_path}")
return False
# Load the original data
with open(input_path, 'r') as f:
original_data = json.load(f)
# Transform the data
transformed_data = self.transform_single_file(original_data)
# Determine output path
if output_file_path is None:
output_file_path = Path(self.config.output_path) / f"{input_path.stem}_transformed.json"
else:
output_file_path = Path(output_file_path) / f"{input_path.stem}_transformed.json"
# Save the transformed data
with open(output_file_path, 'w') as f:
json.dump(transformed_data, f, indent=2)
print(f"Successfully transformed and saved: {output_file_path}")
return True
except Exception as e:
print(f"Failed to process {input_file_path}: {e}")
return False
def main():
"""Main function to run the transformation agent"""
import argparse
parser = argparse.ArgumentParser(description="Transform TradingAgents output to widget-friendly format")
parser.add_argument("--api-key", help="OpenAI API key")
parser.add_argument("--input-file", default=" ../output_data/AVAH/TradingAgentsStrategy_logs/full_states_log_2025-07-26.json", help="Process a single input file")
parser.add_argument("--output-file", default="../output_data/AVAH/TradingAgentsStrategy_transformed_logs/full_states_log_2025-07-26.json", help="Output file path (for single file processing)")
parser.add_argument("--eval-results-path", default="../output_data/AVAH/TradingAgentsStrategy_logs", help="Path to eval_results directory")
parser.add_argument("--output-path", default="../output_data/AVAH/TradingAgentsStrategy_transformed_logs/", help="Output directory path")
args = parser.parse_args()
# Create configuration
config = TransformationConfig(
openai_api_key=args.api_key,
eval_results_path=args.eval_results_path,
output_path=args.output_path
)
# Create agent
agent = DataTransformationAgent(config)
if args.input_file:
# Process single file
success = agent.process_single_file(args.input_file, args.output_file)
if success:
print("Single file processing completed successfully")
else:
print("Single file processing failed")
else:
# Process all files
results = agent.process_all_files()
print(f"\nProcessing completed:")
print(f"Success: {len(results['success'])} files")
print(f"Failed: {len(results['failed'])} files")
if results['success']:
print("\nSuccessfully processed files:")
for file_path in results['success']:
print(f" - {file_path}")
if results['failed']:
print("\nFailed to process files:")
for file_path in results['failed']:
print(f" - {file_path}")
if __name__ == "__main__":
main()