Update configuration and add project documentation

- Add CLAUDE.md for AI assistant guidance
- Add .env.example for environment setup
- Update requirements and configuration files
- Improve Google News utility error handling
- Add project documentation structure

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
佐藤優一 2025-08-03 16:54:08 +09:00
parent a438acdbbd
commit 2ce59a354e
30 changed files with 5027 additions and 38 deletions

31
.env.example Normal file
View File

@ -0,0 +1,31 @@
# TradingAgents Environment Variables
# Copy this file to .env and update with your actual values
# Required API Keys
OPENAI_API_KEY=your_openai_api_key_here
FINNHUB_API_KEY=your_finnhub_api_key_here
# Optional API Keys (for alternative LLM providers)
ANTHROPIC_API_KEY=your_anthropic_api_key_here
GOOGLE_API_KEY=your_google_api_key_here
# Reddit API Configuration
REDDIT_CLIENT_ID=your_reddit_client_id_here
REDDIT_CLIENT_SECRET=your_reddit_client_secret_here
# Project Configuration
TRADINGAGENTS_RESULTS_DIR=./results
# LLM Configuration
LLM_PROVIDER=openai # Options: openai, anthropic, google
DEEP_THINK_LLM=o4-mini
QUICK_THINK_LLM=gpt-4o-mini
BACKEND_URL=https://api.openai.com/v1
# Trading Configuration
MAX_DEBATE_ROUNDS=1
MAX_RISK_DISCUSS_ROUNDS=1
ONLINE_TOOLS=false # Set to false for backtesting
# Reddit Data Configuration
REDDIT_DATA_DIR=/path/to/your/reddit/data/directory

106
CLAUDE.md Normal file
View File

@ -0,0 +1,106 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Todo管理ルール
### チケット内のタスク管理
- 各実装チケットdocs/XXX_*.md内のタスクは`- [ ]`で未完了、`- [x]`で完了を表す
- タスクが完了したら、該当ファイルを編集して`- [ ]`を`- [x]`に変更する
- 例:
```markdown
## タスク
- [x] 完了したタスク
- [ ] 未完了のタスク
```
## プロジェクト概要
TradingAgentsは、実際の投資会社の構造を模倣したマルチエージェントLLMトレーディングフレームワークです。LangGraphを使用して構築され、ファンダメンタル分析、センチメント分析、テクニカル分析などを行う専門的なエージェントが協調して市場分析と取引判断を行います。
## 重要なコマンド
### インストールと環境セットアップ
```bash
# 仮想環境の作成Python 3.10以上が必要)
conda create -n tradingagents python=3.13
conda activate tradingagents
# 依存関係のインストール
pip install -r requirements.txt
# 必要なAPIキーの設定
export FINNHUB_API_KEY=$YOUR_FINNHUB_API_KEY
export OPENAI_API_KEY=$YOUR_OPENAI_API_KEY
```
### CLIの実行
```bash
python -m cli.main
```
### パッケージとしての使用
```python
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
ta = TradingAgentsGraph(debug=True, config=DEFAULT_CONFIG.copy())
_, decision = ta.propagate("NVDA", "2024-05-10")
```
## アーキテクチャと主要コンポーネント
### ディレクトリ構造
- `tradingagents/` - メインパッケージ
- `agents/` - すべてのエージェント実装
- `analysts/` - 4種類のアナリストエージェントMarket, Social, News, Fundamentals
- `researchers/` - Bull/Bearリサーチャー議論によるバランス評価
- `trader/` - 取引判断を行うトレーダーエージェント
- `managers/` - リサーチマネージャーとリスクマネージャー
- `risk_mgmt/` - リスク評価のディベーターエージェント
- `dataflows/` - データ取得とキャッシュ管理
- `graph/` - LangGraphベースのワークフロー実装
- `cli/` - リッチなCLIインターフェース
### 主要な設定ファイル
- `tradingagents/default_config.py` - デフォルト設定
- LLMプロバイダー設定OpenAI、Anthropic、Google
- モデル選択deep_think_llm、quick_think_llm
- 議論ラウンド数の設定
- オンライン/オフラインツールの切り替え
### エージェントフロー
1. **アナリストチーム** - 市場、ソーシャル、ニュース、ファンダメンタル分析を並行実行
2. **リサーチチーム** - Bull/Bearリサーチャーによる議論と評価
3. **トレーダー** - 総合的な取引判断
4. **リスク管理** - ポートフォリオリスクの評価と調整
5. **ポートフォリオマネージャー** - 最終承認/拒否
### データソース
- FinnHub API金融データ
- Reddit APIソーシャルセンチメント
- Google Newsニュース分析
- Yahoo Finance価格データ
- StockStatsテクニカル指標
## 開発時の注意点
### API使用量
フレームワークは大量のAPIコールを行うため、テスト時は以下を推奨
- `gpt-4o-mini`や`o4-mini`を使用してコストを削減
- `config["max_debate_rounds"]`を1に設定して議論ラウンドを制限
- `config["online_tools"]`をFalseにしてキャッシュデータを使用
### メモリ管理
各エージェントは独自のメモリ(`FinancialSituationMemory`)を持ち、`results_dir`に保存されます。
### 現在の制限
- 正式なテストフレームワークが未設定
- リンターやコード品質ツールが未設定
- CI/CDパイプラインが未実装
## トラブルシューティング
### M1 Mac (ARM64) での問題
- `chromadb`のインストール時にビルドエラーが発生する場合:`pip install --upgrade --force-reinstall chromadb`
- numpy互換性の問題`pip install numpy==1.26.2`を使用

View File

@ -0,0 +1,67 @@
# チケット #001: TAFlowStrategy 実装
## 概要
TradingAgentsGraphをラップし、バックテスト用の戦略エンジンとして機能するTAFlowStrategyクラスの実装
## 目的
- 既存のTradingAgentsGraphを活用した売買判断の実行
- バックテスト環境での適切な動作(オフラインモード、キャッシュ利用)
- 日付を指定して Buy/Sell/Hold の判断を返すインターフェース
## 実装要件
### 1. クラス設計
```python
class TAFlowStrategy:
def __init__(self, config: dict):
"""
Args:
config: TradingAgentsGraphの設定辞書
- online_tools: False固定キャッシュのみ利用
- その他既存のDEFAULT_CONFIG設定を継承
"""
pass
def decide(self, ticker: str, date: str) -> str:
"""
指定日付での売買判断を返す
Args:
ticker: 銘柄シンボル(例: "AAPL"
date: 判断日YYYY-MM-DD形式
Returns:
"Buy" | "Sell" | "Hold" のいずれか
"""
pass
```
### 2. 実装詳細
- TradingAgentsGraphのpropagateメソッドを内部で呼び出し
- エラーハンドリングデータ不足、API制限等
- デバッグモードのサポート
- メモリ管理results_dirへの保存
### 3. 設定管理
- config["online_tools"] = False の強制
- キャッシュディレクトリの自動設定
- LLMモデル選択の柔軟性確保
## 受け入れ条件
- [ ] TAFlowStrategyクラスが実装され、decideメソッドが正しく動作する
- [ ] オフラインモードでのみ動作することが保証される
- [ ] エラー時の適切なフォールバック(例:データ不足時は"Hold"
- [ ] ユニットテストの作成
## 依存関係
- tradingagents.graph.trading_graph
- tradingagents.default_config
- 既存のエージェント実装
## タスク
- [ ] TAFlowStrategyクラスの基本実装
- [ ] TradingAgentsGraphとの統合
- [ ] エラーハンドリングの実装
- [ ] 設定管理の実装
- [ ] ユニットテストの作成
- [ ] ドキュメント作成

View File

@ -0,0 +1,85 @@
# チケット #002: CLI インターフェース実装
## 概要
バックテスト機能のコマンドラインインターフェースの実装。既存のCLIフレームワークを拡張し、backtestサブコマンドを追加
## 目的
- 直感的なコマンドライン操作でバックテストを実行
- 過去の実行結果の管理と表示
- 結果の比較機能の提供
## 実装要件
### 1. コマンド体系
#### メインコマンド
```bash
python -m cli.main backtest [OPTIONS]
```
#### オプション
- `--ticker`: 銘柄シンボル(必須)
- `--start`: 開始日 YYYY-MM-DD必須
- `--end`: 終了日 YYYY-MM-DD必須
- `--fee`: 取引手数料率(任意、デフォルト: 0.001
- `--slippage`: スリッページ率(任意、デフォルト: 0.0005
- `--results-db`: 結果保存先DB/ファイル(任意、デフォルト: results.sqlite
#### サブコマンド
```bash
# 過去実行の一覧表示
python -m cli.main backtest list
# 特定結果のHTMLレポート表示
python -m cli.main backtest show --id 5
# 2つの結果を比較
python -m cli.main backtest compare --id1 3 --id2 7
```
### 2. 実装詳細
#### CLIモジュール構造
```
cli/
├── commands/
│ └── backtest.py # バックテストコマンド実装
├── utils/
│ └── validators.py # 入力検証ユーティリティ
└── main.py # 既存のエントリーポイント
```
#### Rich ライブラリの活用
- プログレスバーでシミュレーション進捗表示
- 結果サマリーの美しいテーブル表示
- エラーメッセージの分かりやすい表示
### 3. 入力検証
- 日付フォーマットの検証YYYY-MM-DD
- 開始日 < 終了日の確認
- 銘柄シンボルの妥当性チェック
- 手数料・スリッページの範囲チェック0-1
## 受け入れ条件
- [ ] backtestコマンドが正しく実行できる
- [ ] 全ての必須パラメータの検証が機能する
- [ ] list/show/compareサブコマンドが動作する
- [ ] Richによる見やすい出力
- [ ] エラー時の適切なメッセージ表示
- [ ] --helpオプションの充実
## 依存関係
- clickCLIフレームワーク
- richTUIライブラリ
- 既存のcli.mainモジュール
## タスク
- [ ] backtest.pyコマンドモジュールの作成
- [ ] メインコマンドの実装(実行オプション)
- [ ] listサブコマンドの実装
- [ ] showサブコマンドの実装
- [ ] compareサブコマンドの実装
- [ ] 入力検証ロジックの実装
- [ ] Richを使った出力フォーマット
- [ ] ヘルプメッセージの作成
- [ ] 統合テスト

View File

@ -0,0 +1,124 @@
# チケット #003: シミュレーションエンジン実装
## 概要
バックテストのコアとなるシミュレーションエンジンの実装。日次での売買シミュレーションとポジション管理を行う
## 目的
- 期間中の営業日ループでの取引シミュレーション
- ポジション管理(ロング/ショート/フラット)
- 取引コストとスリッページの考慮
- 正確な損益計算
## 実装要件
### 1. クラス設計
```python
class BacktestEngine:
def __init__(self, initial_capital: float = 100000.0):
"""
Args:
initial_capital: 初期資金(デフォルト: $100,000
"""
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.position = None # 'long', 'short', None
self.trades = [] # 取引履歴
self.equity_curve = [] # 資産推移
def run(self,
ticker: str,
start_date: str,
end_date: str,
strategy: TAFlowStrategy,
fee_rate: float = 0.001,
slippage_rate: float = 0.0005) -> BacktestResult:
"""
バックテストを実行
Args:
ticker: 銘柄シンボル
start_date: 開始日
end_date: 終了日
strategy: 戦略オブジェクト
fee_rate: 取引手数料率
slippage_rate: スリッページ率
Returns:
BacktestResult: 結果オブジェクト
"""
pass
```
### 2. シミュレーションロジック
#### 日次ループ処理
1. 営業日リストの取得pandas_market_calendars使用
2. 各営業日について:
- 当日終値後にstrategy.decide(ticker, date)を実行
- シグナルを取得Buy/Sell/Hold
- 翌営業日始値で約定処理
#### ポジション管理ルール
```
現在ポジション | シグナル | アクション
-------------|---------|------------
None | Buy | ロング新規
None | Sell | ショート新規
Long | Sell | ロング決済→ショート新規
Short | Buy | ショート決済→ロング新規
Long | Buy | 保有継続
Short | Sell | 保有継続
Any | Hold | 保有継続
```
#### 約定処理
- 買い約定価格 = 始値 × (1 + slippage_rate)
- 売り約定価格 = 始値 × (1 - slippage_rate)
- 取引コスト = 約定金額 × fee_rate
- 全資産を100%投入レバレッジ1倍
### 3. データ管理
#### 価格データの取得
- YahooFinance APIを使用
- 必要なデータOHLCV
- キャッシュ機構の実装
#### 取引履歴の記録
```python
@dataclass
class Trade:
date: str
action: str # 'buy', 'sell'
price: float
shares: float
fee: float
slippage: float
capital_after: float
```
## 受け入れ条件
- [ ] 営業日ベースでの正確なループ処理
- [ ] ポジション遷移の正確な実装
- [ ] スリッページと手数料の正しい計算
- [ ] 取引履歴の完全な記録
- [ ] エッジケース(データ欠損等)の処理
- [ ] Buy & Hold戦略の同時計算
## 依存関係
- pandas
- pandas_market_calendars
- yfinance
- TAFlowStrategy
## タスク
- [ ] BacktestEngineクラスの基本実装
- [ ] 営業日カレンダーの統合
- [ ] 価格データ取得機能
- [ ] ポジション管理ロジック
- [ ] 約定処理と手数料計算
- [ ] 取引履歴の記録機能
- [ ] Buy & Hold比較実装
- [ ] エラーハンドリング
- [ ] ユニットテスト作成

View File

@ -0,0 +1,142 @@
# チケット #004: 評価指標計算実装
## 概要
バックテスト結果の評価指標を計算するモジュールの実装。標準的な金融指標を網羅的に計算
## 目的
- 戦略パフォーマンスの定量的評価
- Buy & Holdとの比較可能な指標提供
- 業界標準の指標による結果の信頼性確保
## 実装要件
### 1. 評価指標クラス
```python
@dataclass
class PerformanceMetrics:
# リターン関連
total_return: float # 累積リターン
annual_return: float # 年率リターン
# リスク関連
sharpe_ratio: float # シャープレシオ
max_drawdown: float # 最大ドローダウン
max_drawdown_duration: int # 最大DD期間日数
# 取引統計
win_rate: float # 勝率
profit_factor: float # プロフィットファクター
total_trades: int # 総取引回数
winning_trades: int # 勝ち取引数
losing_trades: int # 負け取引数
# その他
final_capital: float # 最終資産
trading_days: int # 取引日数
class MetricsCalculator:
def calculate(self,
equity_curve: List[float],
trades: List[Trade],
initial_capital: float,
start_date: str,
end_date: str) -> PerformanceMetrics:
"""
評価指標を計算
"""
pass
```
### 2. 指標計算の詳細
#### 累積リターン
```
累積リターン = (最終資産 / 初期資産 - 1) × 100%
```
#### 年率リターン
```
年率リターン = ((1 + 累積リターン) ^ (365 / 日数) - 1) × 100%
```
#### シャープレシオ
```
日次リターン = (今日の資産 / 昨日の資産) - 1
シャープレシオ = (日次平均リターン / 日次標準偏差) × √252
※無リスク金利 = 0と仮定
```
#### 最大ドローダウン
```python
def calculate_max_drawdown(equity_curve: List[float]) -> tuple[float, int]:
"""
Returns:
(max_drawdown_pct, max_duration_days)
"""
peak = equity_curve[0]
max_dd = 0
max_duration = 0
current_duration = 0
for value in equity_curve:
if value > peak:
peak = value
current_duration = 0
else:
drawdown = (peak - value) / peak
max_dd = max(max_dd, drawdown)
current_duration += 1
max_duration = max(max_duration, current_duration)
return max_dd * 100, max_duration
```
#### 勝率とプロフィットファクター
```
勝率 = 利益取引数 / 総取引数 × 100%
プロフィットファクター = 総利益 / 総損失
```
### 3. Buy & Hold との比較
```python
class ComparativeMetrics:
def compare(self,
strategy_metrics: PerformanceMetrics,
buy_hold_metrics: PerformanceMetrics) -> dict:
"""
戦略とBuy & Holdを比較
Returns:
{
'excess_return': float, # 超過リターン
'relative_sharpe': float, # シャープレシオの差
'relative_drawdown': float, # DD改善率
}
"""
pass
```
## 受け入れ条件
- [ ] 全指標の正確な計算
- [ ] エッジケース(取引なし、損失のみ等)の処理
- [ ] Buy & Hold指標の同時計算
- [ ] 比較指標の提供
- [ ] 計算精度のテスト(小数点以下の扱い)
- [ ] パフォーマンス(大量データでの計算速度)
## 依存関係
- numpy統計計算
- pandas時系列処理
- BacktestEngineの出力形式
## タスク
- [ ] PerformanceMetricsデータクラスの定義
- [ ] MetricsCalculatorクラスの基本実装
- [ ] 各指標の計算メソッド実装
- [ ] Buy & Hold計算機能
- [ ] 比較指標計算機能
- [ ] エラーハンドリング(ゼロ除算等)
- [ ] 単体テストの作成
- [ ] ドキュメント作成

View File

@ -0,0 +1,164 @@
# チケット #005: 出力機能実装
## 概要
バックテスト結果の可視化とレポート生成機能の実装。ターミナル出力とHTMLレポートの2形式で結果を提供
## 目的
- 結果の即座の確認(ターミナル)
- 詳細な分析用レポートHTML
- インタラクティブなグラフによる視覚的理解
## 実装要件
### 1. ターミナル出力
#### サマリーテーブル
```python
class TerminalReporter:
def display_summary(self,
metrics: PerformanceMetrics,
buy_hold_metrics: PerformanceMetrics):
"""
Richライブラリを使用した美しいテーブル表示
"""
# 例:
# ┌─────────────────────┬────────────┬────────────┐
# │ Metric │ Strategy │ Buy & Hold │
# ├─────────────────────┼────────────┼────────────┤
# │ Total Return │ +45.2% │ +32.1% │
# │ Annual Return │ +38.5% │ +27.8% │
# │ Sharpe Ratio │ 1.45 │ 1.12 │
# │ Max Drawdown │ -12.3% │ -18.5% │
# │ Win Rate │ 58.2% │ N/A │
# │ Total Trades │ 42 │ 1 │
# └─────────────────────┴────────────┴────────────┘
```
#### プログレスバー
```python
def show_progress(self, current: int, total: int, message: str):
"""
シミュレーション実行中の進捗表示
"""
# Rich.progress を使用
```
### 2. HTMLレポート生成
#### レポート構造
```html
<!DOCTYPE html>
<html>
<head>
<title>Backtest Report - {ticker} ({start} to {end})</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<style>
/* レスポンシブデザイン */
</style>
</head>
<body>
<h1>バックテスト結果レポート</h1>
<!-- 1. サマリーセクション -->
<section id="summary">
<!-- 実行パラメータ -->
<!-- 主要指標の比較 -->
</section>
<!-- 2. グラフセクション -->
<section id="charts">
<!-- Equity Curve -->
<!-- Drawdown Chart -->
<!-- Returns Distribution -->
</section>
<!-- 3. 取引履歴 -->
<section id="trades">
<!-- ページネーション付きテーブル -->
</section>
</body>
</html>
```
#### Plotlyグラフ実装
```python
class ChartGenerator:
def create_equity_curve(self,
dates: List[str],
strategy_equity: List[float],
buyhold_equity: List[float]) -> str:
"""
戦略とBuy&Holdの資産推移グラフ
"""
pass
def create_drawdown_chart(self,
dates: List[str],
drawdown_pct: List[float]) -> str:
"""
ドローダウン推移グラフ
"""
pass
def create_returns_histogram(self,
daily_returns: List[float]) -> str:
"""
日次リターンの分布
"""
pass
```
### 3. レポート保存と管理
```python
class ReportManager:
def __init__(self, reports_dir: str = "reports/"):
self.reports_dir = reports_dir
def save_report(self,
html_content: str,
ticker: str,
start_date: str,
end_date: str) -> str:
"""
レポートを保存し、ファイルパスを返す
ファイル名形式:
{ticker}_{start}_{end}_{timestamp}.html
"""
pass
def open_in_browser(self, report_path: str):
"""
デフォルトブラウザでレポートを開く
"""
pass
```
## 受け入れ条件
- [ ] ターミナルでの見やすいサマリー表示
- [ ] インタラクティブなHTMLレポート
- [ ] 全グラフの正しい描画
- [ ] レスポンシブデザイン(モバイル対応)
- [ ] 取引履歴の完全な表示
- [ ] レポートの自動保存と管理
- [ ] ブラウザでの自動オープン機能
## 依存関係
- richターミナルUI
- plotlyグラフ描画
- jinja2HTMLテンプレート
- webbrowserブラウザ制御
## タスク
- [ ] TerminalReporterクラスの実装
- [ ] HTMLテンプレートの作成
- [ ] ChartGeneratorクラスの実装
- [ ] Equity Curveグラフ実装
- [ ] Drawdownグラフ実装
- [ ] リターン分布グラフ実装
- [ ] 取引履歴テーブル実装
- [ ] ReportManagerクラスの実装
- [ ] CSSスタイリング
- [ ] 統合テスト

View File

@ -0,0 +1,192 @@
# チケット #006: 永続化機能実装
## 概要
バックテスト結果をデータベースに保存し、過去の実行結果を管理・比較可能にする機能の実装
## 目的
- 実行結果の永続化による履歴管理
- 過去結果の検索と参照
- 複数結果の比較分析
- 結果の再現性確保
## 実装要件
### 1. データベース設計
#### SQLiteスキーマ
```sql
-- バックテスト実行記録
CREATE TABLE backtests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker VARCHAR(10) NOT NULL,
start_date DATE NOT NULL,
end_date DATE NOT NULL,
initial_capital DECIMAL(10,2) NOT NULL,
final_capital DECIMAL(10,2) NOT NULL,
fee_rate DECIMAL(5,4) NOT NULL,
slippage_rate DECIMAL(5,4) NOT NULL,
-- 評価指標
total_return DECIMAL(10,4),
annual_return DECIMAL(10,4),
sharpe_ratio DECIMAL(10,4),
max_drawdown DECIMAL(10,4),
win_rate DECIMAL(10,4),
profit_factor DECIMAL(10,4),
total_trades INTEGER,
-- Buy & Hold比較
buyhold_return DECIMAL(10,4),
buyhold_sharpe DECIMAL(10,4),
-- メタデータ
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
execution_time_sec DECIMAL(10,2),
llm_model VARCHAR(50),
config_json TEXT, -- 完全な設定のJSON
INDEX idx_ticker_date (ticker, start_date, end_date)
);
-- 個別取引記録
CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backtest_id INTEGER NOT NULL,
trade_date DATE NOT NULL,
action VARCHAR(10) NOT NULL, -- 'buy', 'sell'
price DECIMAL(10,4) NOT NULL,
shares DECIMAL(10,4) NOT NULL,
fee DECIMAL(10,4) NOT NULL,
slippage DECIMAL(10,4) NOT NULL,
capital_after DECIMAL(10,2) NOT NULL,
FOREIGN KEY (backtest_id) REFERENCES backtests(id),
INDEX idx_backtest_id (backtest_id)
);
-- 日次パフォーマンス記録(オプション)
CREATE TABLE daily_performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backtest_id INTEGER NOT NULL,
date DATE NOT NULL,
equity DECIMAL(10,2) NOT NULL,
daily_return DECIMAL(10,6),
drawdown DECIMAL(10,6),
FOREIGN KEY (backtest_id) REFERENCES backtests(id),
INDEX idx_backtest_date (backtest_id, date)
);
```
### 2. データアクセス層
```python
class BacktestRepository:
def __init__(self, db_path: str = "results.sqlite"):
self.db_path = db_path
self._init_database()
def save_backtest(self,
result: BacktestResult,
config: dict) -> int:
"""
バックテスト結果を保存
Returns:
backtest_id: 保存されたレコードのID
"""
pass
def get_backtest(self, backtest_id: int) -> BacktestResult:
"""
IDから結果を取得
"""
pass
def list_backtests(self,
ticker: str = None,
start_date: str = None,
end_date: str = None,
limit: int = 100) -> List[BacktestSummary]:
"""
条件に合うバックテストの一覧
"""
pass
def compare_backtests(self,
id1: int,
id2: int) -> ComparisonResult:
"""
2つのバックテスト結果を比較
"""
pass
```
### 3. Parquetファイル形式サポートオプション
```python
class ParquetPersistence:
def save_to_parquet(self,
result: BacktestResult,
output_dir: str = "results/"):
"""
大規模データ向けのParquet形式保存
ファイル構造:
results/
├── metadata.parquet # バックテストメタデータ
├── trades/
│ └── {backtest_id}.parquet
└── daily_performance/
└── {backtest_id}.parquet
"""
pass
```
### 4. データ移行とバックアップ
```python
class DataMigration:
def export_to_csv(self, backtest_id: int, output_dir: str):
"""
特定のバックテスト結果をCSVエクスポート
"""
pass
def import_from_csv(self, csv_dir: str) -> int:
"""
CSVからインポート
"""
pass
def backup_database(self, backup_path: str):
"""
データベース全体のバックアップ
"""
pass
```
## 受け入れ条件
- [ ] SQLiteデータベースの自動初期化
- [ ] 結果の完全な保存(メトリクス、取引、設定)
- [ ] 高速な検索とフィルタリング
- [ ] データ整合性の保証(トランザクション)
- [ ] 既存結果の上書き防止
- [ ] エクスポート/インポート機能
- [ ] パフォーマンス1000件以上の結果でも高速
## 依存関係
- sqlite3標準ライブラリ
- pandasデータ操作
- pyarrowParquetサポート、オプション
## タスク
- [ ] データベーススキーマの作成
- [ ] BacktestRepositoryクラスの実装
- [ ] CRUD操作の実装
- [ ] 検索・フィルタリング機能
- [ ] トランザクション管理
- [ ] Parquetサポートオプション
- [ ] データ移行ツール
- [ ] インデックス最適化
- [ ] 統合テスト作成

View File

@ -0,0 +1,204 @@
# チケット #007: テスト実装
## 概要
バックテスト機能の品質保証のための包括的なテストスイートの実装
## 目的
- コードの信頼性確保
- リグレッション防止
- エッジケースの検証
- 計算精度の保証
## 実装要件
### 1. テスト構造
```
tests/
├── unit/
│ ├── test_ta_flow_strategy.py
│ ├── test_backtest_engine.py
│ ├── test_metrics_calculator.py
│ ├── test_persistence.py
│ └── test_output_features.py
├── integration/
│ ├── test_cli_commands.py
│ ├── test_full_backtest.py
│ └── test_data_flow.py
├── fixtures/
│ ├── sample_data.py
│ └── mock_responses.py
└── conftest.py # pytest設定
```
### 2. ユニットテスト
#### TAFlowStrategy テスト
```python
class TestTAFlowStrategy:
def test_initialization(self):
"""設定の正しい初期化"""
pass
def test_offline_mode_enforcement(self):
"""オフラインモードの強制確認"""
pass
def test_decide_returns_valid_signal(self):
"""Buy/Sell/Holdの返却確認"""
pass
def test_error_handling(self):
"""エラー時のフォールバック"""
pass
```
#### BacktestEngine テスト
```python
class TestBacktestEngine:
def test_position_transitions(self):
"""ポジション遷移の正確性"""
# None -> Long
# Long -> Short
# Short -> None
# etc.
pass
def test_fee_calculation(self):
"""手数料計算の検証"""
pass
def test_slippage_calculation(self):
"""スリッページ計算の検証"""
pass
def test_capital_management(self):
"""資金管理の正確性"""
pass
```
#### MetricsCalculator テスト
```python
class TestMetricsCalculator:
@pytest.mark.parametrize("equity_curve,expected_return", [
([100000, 110000], 0.10),
([100000, 90000], -0.10),
])
def test_total_return_calculation(self, equity_curve, expected_return):
"""累積リターン計算の検証"""
pass
def test_sharpe_ratio_calculation(self):
"""シャープレシオ計算の検証"""
pass
def test_max_drawdown_calculation(self):
"""最大ドローダウン計算の検証"""
pass
def test_edge_cases(self):
"""エッジケース(ゼロ除算等)"""
pass
```
### 3. 統合テスト
```python
class TestFullBacktest:
def test_end_to_end_backtest(self):
"""完全なバックテストフローの検証"""
# 1. モックデータの準備
# 2. バックテスト実行
# 3. 結果の検証
# 4. 永続化確認
pass
def test_cli_integration(self):
"""CLIコマンドの統合テスト"""
result = runner.invoke(cli, [
'backtest',
'--ticker', 'AAPL',
'--start', '2024-01-01',
'--end', '2024-03-31'
])
assert result.exit_code == 0
```
### 4. パフォーマンステスト
```python
class TestPerformance:
def test_large_dataset_performance(self):
"""大規模データでのパフォーマンス"""
# 10年分のデータでも1分以内
pass
def test_memory_usage(self):
"""メモリ使用量の監視"""
pass
```
### 5. モックとフィクスチャ
```python
# fixtures/sample_data.py
@pytest.fixture
def sample_price_data():
"""サンプル価格データ"""
return pd.DataFrame({
'Date': pd.date_range('2024-01-01', periods=100),
'Open': np.random.rand(100) * 100 + 100,
'High': np.random.rand(100) * 100 + 110,
'Low': np.random.rand(100) * 100 + 90,
'Close': np.random.rand(100) * 100 + 100,
'Volume': np.random.randint(1000000, 10000000, 100)
})
@pytest.fixture
def mock_ta_strategy():
"""モックストラテジー"""
strategy = Mock(spec=TAFlowStrategy)
strategy.decide.side_effect = cycle(['Buy', 'Hold', 'Sell'])
return strategy
```
### 6. カバレッジ目標
```yaml
# .coveragerc
[run]
source = tradingagents.backtest
omit =
*/tests/*
*/migrations/*
[report]
precision = 2
fail_under = 80 # 80%以上のカバレッジ必須
```
## 受け入れ条件
- [ ] 全モジュールのユニットテスト作成
- [ ] コードカバレッジ80%以上
- [ ] 統合テストの成功
- [ ] エッジケースの網羅
- [ ] CI/CDでの自動実行設定
- [ ] テストドキュメントの作成
## 依存関係
- pytest
- pytest-covカバレッジ
- pytest-mockモック
- pytest-benchmarkパフォーマンス
## タスク
- [ ] テストディレクトリ構造の作成
- [ ] pytest設定ファイル作成
- [ ] TAFlowStrategyのテスト実装
- [ ] BacktestEngineのテスト実装
- [ ] MetricsCalculatorのテスト実装
- [ ] 永続化機能のテスト実装
- [ ] 統合テストの実装
- [ ] フィクスチャとモックの作成
- [ ] カバレッジ設定
- [ ] CI設定GitHub Actions等

View File

@ -0,0 +1,142 @@
# チケット #008: Reddit prawクライアント実装
## 概要
Reddit API (praw) を使用してデータを取得するクライアントラッパーの段階的実装
## 目的
- Reddit APIへの認証と接続管理
- レート制限の適切な処理
- エラーハンドリングとリトライ機構
- 既存システムとの互換性維持
- オプショナル機能としての実装(既存システムと並行動作)
## 実装要件
### 1. 段階的実装アプローチ
```python
# tradingagents/config.py に追加
USE_PRAW_API = os.getenv("USE_PRAW_API", "false").lower() == "true"
```
### 2. クラス設計
```python
class RedditPrawClient:
def __init__(self, config: dict):
"""
Args:
config: Reddit API設定
- client_id: Reddit App ID
- client_secret: Reddit App Secret
- user_agent: User Agent文字列
- rate_limit_pause: レート制限時の待機秒数
"""
pass
def authenticate(self) -> bool:
"""
Reddit APIへの認証
Returns:
成功時True、失敗時False
"""
pass
def get_subreddit_posts(self,
subreddit: str,
sort: str = "hot",
limit: int = 100,
time_filter: str = "day") -> List[dict]:
"""
特定のsubredditから投稿を取得
Args:
subreddit: subreddit名
sort: ソート方法 (hot/top/new)
limit: 取得件数 (最大100)
time_filter: 期間フィルタ (day/week/month/year/all)
Returns:
投稿データのリスト
"""
pass
def search_posts(self,
query: str,
subreddit: str = None,
limit: int = 100,
sort: str = "relevance") -> List[dict]:
"""
キーワード検索で投稿を取得
Args:
query: 検索クエリ
subreddit: 特定のsubredditに限定オプション
limit: 取得件数
sort: ソート方法
Returns:
検索結果の投稿リスト
"""
pass
```
### 2. レート制限管理
```python
class RateLimiter:
def __init__(self, requests_per_minute: int = 80):
"""
Args:
requests_per_minute: 1分あたりの最大リクエスト数
"""
self.request_times = deque(maxlen=requests_per_minute)
self.requests_per_minute = requests_per_minute
def wait_if_needed(self):
"""必要に応じて待機"""
pass
```
### 3. エラーハンドリング
- 認証エラー: `RedditAuthenticationError`を発生
- レート制限エラー: 自動リトライ with exponential backoff
- ネットワークエラー: 最大3回リトライ
- 不正なsubreddit: `SubredditNotFoundError`を発生
### 4. 設定管理
```python
REDDIT_CLIENT_CONFIG = {
"user_agent": "TradingAgents/1.0 (by /u/your_username)",
"rate_limit_pause": 1.0,
"max_retries": 3,
"timeout": 30,
"requests_per_minute": 80 # 安全マージン20%
}
```
## 受け入れ条件
- [ ] prawライブラリを使用した認証が成功する
- [ ] 投稿データの取得が正しく動作する
- [ ] レート制限に違反しない
- [ ] エラー時の適切な例外処理
- [ ] 取得データが既存のJSONL形式と互換性がある
- [ ] 既存システムとの並行動作確認
- [ ] 単体テストの実装(モック使用)
- [ ] 環境変数での機能切り替えが動作
## 依存関係
- praw (Python Reddit API Wrapper)
- python-dotenv環境変数管理
- 既存のreddit_utils.pyとの連携
## タスク
- [ ] 単体テストの作成TDD
- [ ] RedditPrawClientクラスの基本実装
- [ ] 認証機能の実装
- [ ] subreddit投稿取得機能
- [ ] 検索機能の実装
- [ ] レート制限管理の実装
- [ ] エラーハンドリング実装
- [ ] 環境変数による機能切り替え
- [ ] 設定ファイルの作成
- [ ] 統合テスト作成
- [ ] ドキュメント作成

View File

@ -0,0 +1,198 @@
# チケット #009: Redditデータ取得ロジック実装
## 概要
RedditPrawClientを使用して、実際のデータ取得とフィルタリングを行うビジネスロジックの段階的実装
## 目的
- Global NewsとCompany Newsの効率的な取得
- 企業関連投稿の検索とフィルタリング
- 重複排除とデータ整形
- 日付指定での取得機能
- 既存システムとの互換性維持
## 実装要件
### 1. 段階的実装アプローチ
```python
# 既存システムとの並行動作をサポート
from tradingagents.config import USE_PRAW_API
if USE_PRAW_API:
# 新しいpraw実装を使用
fetcher = RedditDataFetcher(client, config)
else:
# 既存のファイルベースの実装を使用
return fetch_from_local_files()
```
### 2. クラス設計
```python
class RedditDataFetcher:
def __init__(self, client: RedditPrawClient, config: dict):
"""
Args:
client: RedditPrawClientインスタンス
config: 取得設定subredditリスト、取得件数等
"""
self.client = client
self.config = config
self.seen_post_ids = set() # 重複排除用
def fetch_global_news(self,
date: str,
limit_per_subreddit: int = 100) -> List[dict]:
"""
指定日のグローバルニュースを取得
Args:
date: 対象日付 (YYYY-MM-DD)
limit_per_subreddit: 各subredditからの取得件数
Returns:
ニュース投稿のリスト
"""
pass
def fetch_company_news(self,
ticker: str,
company_name: str,
date: str,
limit: int = 50) -> List[dict]:
"""
特定企業のニュースを取得
Args:
ticker: ティッカーシンボル
company_name: 企業名
date: 対象日付
limit: 取得件数
Returns:
企業関連投稿のリスト
"""
pass
```
### 2. 企業投稿の検索戦略
```python
def build_search_queries(ticker: str, company_name: str) -> List[str]:
"""
効果的な検索クエリを生成
Returns:
検索クエリのリスト
例: ["AAPL", "Apple", "$AAPL", "Apple Inc"]
"""
queries = [
f'"{ticker}"',
f'"{company_name}"',
f'${ticker}', # Cashtag
]
# 企業名のバリエーション対応
if " OR " in company_name:
for variant in company_name.split(" OR "):
queries.append(f'"{variant.strip()}"')
return queries
```
### 3. データフィルタリング
```python
def filter_posts_by_date(posts: List[dict], target_date: str) -> List[dict]:
"""
指定日付の投稿のみをフィルタリング
Args:
posts: 投稿リスト
target_date: 対象日付 (YYYY-MM-DD)
Returns:
フィルタリング後の投稿リスト
"""
pass
def filter_company_relevant_posts(posts: List[dict],
ticker: str,
company_name: str) -> List[dict]:
"""
企業に関連する投稿のみをフィルタリング
タイトルまたは本文に企業名/ティッカーが含まれるもの
"""
pass
```
### 4. データ整形
```python
def format_post_data(post: praw.models.Submission) -> dict:
"""
prawの投稿オブジェクトを統一形式に変換
Returns:
{
"id": str,
"title": str,
"selftext": str,
"url": str,
"ups": int,
"created_utc": int,
"subreddit": str,
"author": str,
"num_comments": int,
"ticker": str # 企業投稿の場合
}
"""
pass
```
### 5. バッチ処理
```python
def fetch_historical_data(self,
start_date: str,
end_date: str,
tickers: List[str] = None,
categories: List[str] = ["global_news", "company_news"]):
"""
過去データの一括取得
Args:
start_date: 開始日
end_date: 終了日
tickers: 対象ティッカーリスト
categories: 取得カテゴリ
Yields:
(date, category, data) のタプル
"""
pass
```
## 受け入れ条件
- [ ] 指定日付のデータのみを正確に取得
- [ ] 企業関連投稿の適切なフィルタリング
- [ ] 重複投稿の排除Reddit post ID使用
- [ ] データ形式が既存システムと互換
- [ ] エラー時の適切な処理
- [ ] 効率的なAPI使用最小限のリクエスト
- [ ] 単体テストの実装(モック使用)
- [ ] USE_PRAW_APIフラグでの切り替え動作
## 依存関係
- RedditPrawClientチケット#008
- 既存のreddit_utils.py
- デフォルトTickerリスト設定
## タスク
- [ ] 単体テストの作成TDD
- [ ] RedditDataFetcherクラスの基本実装
- [ ] Global News取得機能
- [ ] Company News取得機能
- [ ] 検索クエリ生成ロジック
- [ ] 日付フィルタリング機能
- [ ] 企業関連性フィルタリング
- [ ] データ整形機能
- [ ] 重複排除機能
- [ ] バッチ処理機能
- [ ] 段階的実装フラグのテスト
- [ ] 統合テスト作成

View File

@ -0,0 +1,222 @@
# チケット #010: Redditキャッシュ管理機能実装
## 概要
取得したRedditデータのキャッシュ管理とJSONLファイルへの保存機能の段階的実装
## 目的
- 取得データの永続化JSONL形式
- 既存データとの互換性維持
- 効率的なデータ読み込み
- 取得履歴の管理
- 既存システムとの互換性確保
## 実装要件
### 1. 段階的実装アプローチ
```python
# 既存のファイル構造と互換性を保つ
from tradingagents.config import USE_PRAW_API
if USE_PRAW_API:
# 新しいキャッシュマネージャーを使用
cache_manager = RedditCacheManager(base_dir)
else:
# 既存のファイルシステムを直接使用
pass
```
### 2. クラス設計
```python
class RedditCacheManager:
def __init__(self, base_dir: str):
"""
Args:
base_dir: データ保存のベースディレクトリ
/Users/y_sato/.../Datasource/reddit_data/
"""
self.base_dir = Path(base_dir)
self.ensure_directory_structure()
def ensure_directory_structure(self):
"""
必要なディレクトリ構造を作成
reddit_data/
├── global_news/
├── company_news/
└── metadata/
"""
pass
```
### 2. データ保存機能
```python
def save_posts(self,
posts: List[dict],
category: str,
date: str,
identifier: str = None) -> str:
"""
投稿データをJSONL形式で保存
Args:
posts: 投稿データのリスト
category: "global_news" or "company_news"
date: 日付 (YYYY-MM-DD)
identifier: subreddit名またはティッカー
Returns:
保存したファイルパス
Example:
global_news/r_worldnews_2024-01-01.jsonl
company_news/AAPL_2024-01-01.jsonl
"""
pass
def append_posts(self,
posts: List[dict],
file_path: str):
"""
既存ファイルに投稿を追加(重複チェック付き)
"""
pass
```
### 3. データ読み込み機能
```python
def load_posts(self,
category: str,
date: str,
identifier: str = None) -> List[dict]:
"""
キャッシュから投稿データを読み込み
Returns:
投稿データのリスト、存在しない場合は空リスト
"""
pass
def check_data_exists(self,
category: str,
date: str,
identifier: str = None) -> bool:
"""
指定データがキャッシュに存在するか確認
"""
pass
```
### 4. メタデータ管理
```python
def update_fetch_history(self,
category: str,
date: str,
identifiers: List[str],
fetch_timestamp: str,
post_count: int):
"""
取得履歴を記録
metadata/fetch_history.json に保存
{
"global_news": {
"2024-01-01": {
"timestamp": "2024-01-02T10:30:00Z",
"subreddits": ["worldnews", "news", ...],
"post_count": 250
}
},
"company_news": {
"AAPL": {
"2024-01-01": {
"timestamp": "2024-01-02T10:35:00Z",
"post_count": 50
}
}
}
}
"""
pass
def get_missing_dates(self,
start_date: str,
end_date: str,
category: str,
identifier: str = None) -> List[str]:
"""
指定期間で未取得の日付リストを返す
"""
pass
```
### 5. 重複管理
```python
class PostIDTracker:
"""
Reddit post IDを使用した重複管理
metadata/post_ids.db (SQLite) で管理
"""
def __init__(self, db_path: str):
self.db_path = db_path
self.init_database()
def is_duplicate(self, post_id: str) -> bool:
"""投稿が既に保存されているか確認"""
pass
def add_post_id(self, post_id: str, date: str, category: str):
"""投稿IDを記録"""
pass
```
### 6. データ検証
```python
def validate_cache_integrity(self,
start_date: str,
end_date: str) -> dict:
"""
キャッシュデータの完全性を検証
Returns:
{
"missing_dates": [...],
"corrupted_files": [...],
"statistics": {
"total_posts": int,
"by_category": {...}
}
}
"""
pass
```
## 受け入れ条件
- [ ] JSONL形式でのデータ保存
- [ ] 既存ファイル形式との完全な互換性
- [ ] 効率的な重複チェック
- [ ] データ取得履歴の管理
- [ ] 欠損データの検出機能
- [ ] データ検証機能
- [ ] 単体テストの実装(モック使用)
- [ ] USE_PRAW_APIフラグでの切り替え
## 依存関係
- Pathlibファイル操作
- JSON/JSONL処理
- SQLite重複管理用
## タスク
- [ ] 単体テストの作成TDD
- [ ] RedditCacheManagerクラスの基本実装
- [ ] ディレクトリ構造の自動作成
- [ ] JSONL保存機能
- [ ] JSONL読み込み機能
- [ ] 取得履歴管理機能
- [ ] PostIDTrackerクラス実装
- [ ] データ検証機能
- [ ] 欠損日付検出機能
- [ ] 段階的実装フラグのテスト
- [ ] 既存データとの互換性テスト
- [ ] 統合テスト作成

View File

@ -0,0 +1,233 @@
# チケット #011: Reddit CLIコマンド実装typer使用
## 概要
Reddit データ取得のためのCLIコマンドインターフェースのtyperを使用した実装
## 目的
- 対話形式での過去データ取得
- 日次更新コマンド
- データ状況の確認機能
- 既存CLIフレームワークtyperとの統合
## 実装要件
### 1. コマンド構造
```
cli/
├── main.py # 既存のtyperアプリ
└── commands/
└── reddit.py # Redditコマンド実装
```
### 2. typerコマンド統合
```python
# cli/main.py への統合
from cli.commands import reddit
# 既存のappにサブコマンドを追加
app.add_typer(reddit.app, name="reddit", help="Reddit data management")
```
### 3. Redditコマンド実装
```python
# cli/commands/reddit.py
import typer
from rich.console import Console
import questionary
app = typer.Typer()
console = Console()
@app.command()
def fetch_historical(
interactive: bool = typer.Option(True, "--interactive/--no-interactive",
help="Use interactive mode for configuration")
):
"""Fetch historical Reddit data"""
if interactive:
# 対話形式
config = interactive_configuration()
else:
# オプション指定
config = parse_command_options()
fetcher = create_reddit_fetcher(config)
fetcher.run()
```
### 4. 対話形式の実装questionary使用
```python
def interactive_configuration():
"""
対話形式でデータ取得設定を行う
既存のquestionaryを活用
"""
console = Console()
# カテゴリ選択
category = questionary.select(
"Which category?",
choices=["global_news", "company_news", "both"]
).ask()
# 期間選択(推奨期間の表示付き)
date_range = interactive_date_range_selection()
# ティッカー選択(プリセット対応)
if category in ["company_news", "both"]:
tickers = interactive_ticker_selection()
# 確認と実行
if confirm_execution(category, date_range, tickers):
return build_config(category, date_range, tickers)
```
### 4. ティッカー選択インターフェース
```python
def interactive_ticker_selection():
"""
デフォルトTickerリストから選択
"""
choice = questionary.select(
"Select ticker preset or enter custom:",
choices=[
"1. Popular Tech Stocks (15 tickers)",
"2. S&P 500 Top 20",
"3. Global Indices (22 ETFs)", # 日経225追加済み
"4. All Combined (50+ tickers)",
"5. Quick Test (5 tickers)",
"6. Custom (enter your own)"
]
).ask()
if choice.startswith("6"):
# カスタム入力
custom = questionary.text(
"Enter tickers (comma-separated):"
).ask()
return parse_custom_tickers(custom)
else:
# プリセット使用
return get_preset_tickers(choice)
```
### 5. 日次更新コマンド
```python
@app.command()
def update(
date: str = typer.Option('yesterday', help='Date to fetch (YYYY-MM-DD or "yesterday")'),
auto: bool = typer.Option(False, help='Run in automatic mode (no prompts)')
):
"""Update Reddit data for specific date"""
if date == 'yesterday':
target_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
else:
target_date = validate_date(date)
if auto:
# 設定ファイルから自動実行
config = load_auto_config()
else:
# 確認プロンプト表示
if not click.confirm(f"Fetch data for {target_date}?"):
return
execute_update(target_date, config)
```
### 6. ステータス確認コマンド
```python
@app.command()
def status(
start: str = typer.Option(None, help='Start date (YYYY-MM-DD)'),
end: str = typer.Option(None, help='End date (YYYY-MM-DD)'),
category: str = typer.Option('all', help='Category to check')
):
"""Check Reddit data cache status"""
cache_manager = RedditCacheManager(get_reddit_data_dir())
# データ可用性の表示
table = Table(title="Reddit Data Status")
table.add_column("Date", style="cyan")
table.add_column("Global News", style="green")
table.add_column("Company News", style="yellow")
table.add_column("Total Posts", style="magenta")
# 統計情報の表示
display_cache_statistics(cache_manager, start, end, category)
```
### 7. データ検証コマンド
```python
@app.command()
def verify(
start: str = typer.Argument(..., help='Start date'),
end: str = typer.Argument(..., help='End date'),
fix: bool = typer.Option(False, help='Attempt to fix issues')
):
"""Verify data integrity and completeness"""
validator = RedditDataValidator()
issues = validator.check_period(start, end)
if issues['missing_dates']:
console.print("[yellow]Missing dates found:[/yellow]")
for date in issues['missing_dates']:
console.print(f" - {date}")
if fix and issues['missing_dates']:
if click.confirm("Fetch missing data?"):
fetch_missing_data(issues['missing_dates'])
```
### 8. プログレス表示
```python
def show_fetch_progress(total_requests: int):
"""
取得進捗の表示
"""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeRemainingColumn(),
) as progress:
task = progress.add_task("Fetching Reddit data...", total=total_requests)
yield progress, task
```
## 受け入れ条件
- [ ] 対話形式での直感的な操作
- [ ] デフォルトTickerリストの選択機能
- [ ] 日次更新の自動実行対応
- [ ] 進捗表示とエラーハンドリング
- [ ] データ状況の可視化
- [ ] typer CLIフレームワークとの統合
- [ ] 単体テストの実装(モック使用)
- [ ] USE_PRAW_APIフラグでの切り替え
## 依存関係
- typer既存CLIフレームワーク
- richTUI表示
- questionary対話形式入力
- RedditDataFetcherチケット#009
- RedditCacheManagerチケット#010
## タスク
- [ ] 単体テストの作成TDD
- [ ] reddit.pyコマンドモジュールの作成
- [ ] typerアプリへの統合
- [ ] fetch-historicalコマンド実装
- [ ] 対話形式インターフェース
- [ ] デフォルトTickerリスト選択機能
- [ ] updateコマンド実装
- [ ] statusコマンド実装
- [ ] verifyコマンド実装
- [ ] プログレス表示機能
- [ ] エラーハンドリング
- [ ] ヘルプメッセージの充実
- [ ] 段階的実装フラグのテスト
- [ ] 統合テスト

View File

@ -0,0 +1,232 @@
# チケット #012: Reddit Utils互換性レイヤー実装
## 概要
既存のreddit_utils.pyと新しいpraw実装の間の互換性レイヤーを実装
## 目的
- 既存システムへの影響を最小限に抑える
- 段階的な移行を可能にする
- 既存のインターフェースを維持
- オンライン/オフラインモードの切り替え
## 実装要件
### 1. 段階的実装アプローチ
```python
# USE_PRAW_APIフラグで新旧実装を切り替え
from tradingagents.config import USE_PRAW_API
if USE_PRAW_API:
# 新しいpraw実装を使用
from .reddit_praw_fetcher import fetch_data
else:
# 既存のファイルベース実装を使用
from .reddit_local_fetcher import fetch_data
```
### 2. 互換性インターフェース
```python
# tradingagents/dataflows/reddit_utils.py の改修
def fetch_top_from_category(
category: str,
date: str,
max_limit: int,
query: str = None,
data_path: str = "reddit_data",
use_cache: bool = True,
online_mode: bool = False
):
"""
既存インターフェースを維持しつつ、praw実装を使用
Args:
category: "global_news" or "company_news"
date: 対象日付 (YYYY-MM-DD)
max_limit: 最大取得件数
query: 企業検索用クエリ(ティッカー)
data_path: データディレクトリパス
use_cache: キャッシュを優先使用するか
online_mode: オンラインで新規取得するか
Returns:
既存と同じ形式の投稿リスト
"""
if use_cache and not online_mode:
# 既存のファイルベース実装を使用
return _fetch_from_local_files(category, date, max_limit, query, data_path)
else:
# 新しいpraw実装を使用
return _fetch_from_reddit_api(category, date, max_limit, query)
```
### 3. 設定に基づく切り替え
```python
class RedditDataSource:
"""
設定に基づいてデータソースを切り替え
"""
def __init__(self, config: dict):
self.config = config
self.online_mode = config.get("online_tools", False)
if self.online_mode:
# praw実装を初期化
self.fetcher = RedditDataFetcher(
RedditPrawClient(config),
config
)
# キャッシュマネージャーは常に初期化
self.cache_manager = RedditCacheManager(
config.get("reddit_data_dir", "reddit_data")
)
def get_data(self, category: str, date: str, **kwargs):
"""
統一インターフェースでデータ取得
"""
# まずキャッシュを確認
if self.cache_manager.check_data_exists(category, date):
return self.cache_manager.load_posts(category, date)
# オンラインモードの場合は新規取得
if self.online_mode:
posts = self.fetcher.fetch_data(category, date, **kwargs)
self.cache_manager.save_posts(posts, category, date)
return posts
# オフラインモードでキャッシュなし
return []
```
### 4. データ形式の変換
```python
def convert_praw_to_legacy_format(praw_posts: List[dict]) -> List[dict]:
"""
praw形式のデータを既存形式に変換
既存形式:
{
"title": str,
"content": str, # selftextから変換
"url": str,
"upvotes": int, # upsから変換
"posted_date": str # created_utcから変換
}
"""
legacy_posts = []
for post in praw_posts:
legacy_post = {
"title": post["title"],
"content": post["selftext"],
"url": post["url"],
"upvotes": post["ups"],
"posted_date": datetime.fromtimestamp(
post["created_utc"]
).strftime("%Y-%m-%d")
}
legacy_posts.append(legacy_post)
return legacy_posts
```
### 5. エラーハンドリング
```python
def safe_reddit_fetch(func):
"""
Reddit API エラーを既存システムに影響させないデコレータ
"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except RedditAPIError as e:
logger.warning(f"Reddit API error: {e}")
# フォールバック: キャッシュデータを返す
return _fallback_to_cache(*args, **kwargs)
except Exception as e:
logger.error(f"Unexpected error in Reddit fetch: {e}")
return [] # 空リストで安全に失敗
return wrapper
```
### 6. 移行支援機能
```python
class RedditDataMigrator:
"""
既存データから新形式への移行支援
"""
def migrate_existing_data(self,
old_data_dir: str,
new_data_dir: str):
"""
既存のJSONLファイルを新しいディレクトリ構造に移行
"""
pass
def validate_migration(self):
"""
移行データの検証
"""
pass
```
### 7. インターフェース統合
```python
# tradingagents/dataflows/interface.py の更新
def get_reddit_global_news(
start_date: str,
look_back_days: int,
**kwargs
) -> str:
"""
既存のインターフェースを維持
内部でRedditDataSourceを使用
"""
data_source = RedditDataSource(DEFAULT_CONFIG)
posts = []
for i in range(look_back_days):
date = calculate_date(start_date, -i)
daily_posts = data_source.get_data(
"global_news",
date,
**kwargs
)
posts.extend(daily_posts)
# 既存と同じ形式で返す
return format_posts_as_string(posts)
```
## 受け入れ条件
- [ ] 既存のインターフェースが変更なく動作
- [ ] USE_PRAW_APIフラグでの切り替え
- [ ] データ形式の正確な変換
- [ ] エラー時の適切なフォールバック
- [ ] パフォーマンスの劣化なし
- [ ] 既存テストがすべて通過
- [ ] 単体テストの実装(モック使用)
## 依存関係
- 既存のreddit_utils.py
- RedditDataFetcherチケット#009
- RedditCacheManagerチケット#010
- interface.py
## タスク
- [ ] 単体テストの作成TDD
- [ ] USE_PRAW_APIフラグの追加
- [ ] RedditDataSourceクラスの実装
- [ ] fetch_top_from_category の改修
- [ ] データ形式変換機能
- [ ] エラーハンドリングデコレータ
- [ ] 設定ベースの切り替え機能
- [ ] interface.py の更新
- [ ] 移行支援ツール
- [ ] 互換性テスト
- [ ] パフォーマンステスト
- [ ] ドキュメント更新

View File

@ -0,0 +1,282 @@
# チケット #013: Reddit自動更新機能実装
## 概要
Redditデータの定期的な自動更新機能とスケジューラー設定の実装
## 目的
- 日次での自動データ更新
- エラー時の通知とリトライ
- 実行ログの管理
- cron/スケジューラーとの統合
## 実装要件
### 1. 段階的実装アプローチ
```python
# USE_PRAW_APIフラグによる実装切り替え
from tradingagents.config import USE_PRAW_API
if USE_PRAW_API:
# 新しいpraw実装での自動更新
from .reddit_praw_updater import RedditDailyUpdater
else:
# 既存のファイルベース更新(何もしない)
class RedditDailyUpdater:
def run_daily_update(self, date):
print("Offline mode - no updates needed")
```
### 2. 自動更新スクリプト
```python
# scripts/reddit_daily_update.py
class RedditDailyUpdater:
def __init__(self, config_path: str = None):
"""
Args:
config_path: 設定ファイルパス
"""
self.config = self.load_config(config_path)
self.logger = self.setup_logging()
def run_daily_update(self, date: str = None):
"""
日次更新のメイン処理
Args:
date: 対象日付(デフォルト: 昨日)
"""
if not date:
date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
self.logger.info(f"Starting daily update for {date}")
try:
# データ取得
results = self.fetch_all_data(date)
# 結果の検証
self.validate_results(results)
# 成功通知
self.notify_success(date, results)
except Exception as e:
self.handle_error(date, e)
```
### 3. 設定ファイル
```yaml
# config/reddit_auto_update.yaml
daily_update:
# 取得対象
categories:
- global_news
- company_news
# 対象ティッカー
tickers:
preset: "sp500" # または具体的なリスト
# custom: ["AAPL", "MSFT", "NVDA"]
# 実行時間設定
schedule:
time: "02:00" # 午前2時実行
timezone: "America/New_York"
# リトライ設定
retry:
max_attempts: 3
delay_seconds: 300 # 5分間隔
# 通知設定
notifications:
on_error: true
on_success: false
methods:
- log_file
# - email
# - slack
# ログ設定
logging:
level: INFO
file: logs/reddit_update.log
max_size: 10MB
backup_count: 7
```
### 4. エラーハンドリングとリトライ
```python
def fetch_with_retry(self, fetch_func, *args, **kwargs):
"""
リトライ機能付きデータ取得
"""
max_attempts = self.config['retry']['max_attempts']
delay = self.config['retry']['delay_seconds']
for attempt in range(max_attempts):
try:
return fetch_func(*args, **kwargs)
except RedditAPIError as e:
if attempt < max_attempts - 1:
self.logger.warning(
f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay} seconds..."
)
time.sleep(delay)
else:
raise
```
### 5. 実行ログ管理
```python
class UpdateLogger:
"""
更新実行のログ管理
"""
def log_execution(self,
date: str,
status: str,
details: dict):
"""
実行結果をログに記録
ログ形式:
{
"timestamp": "2024-03-15T02:05:30Z",
"date": "2024-03-14",
"status": "success",
"categories": {
"global_news": {
"posts_fetched": 250,
"new_posts": 230
},
"company_news": {
"tickers_processed": 50,
"total_posts": 1250
}
},
"duration_seconds": 180,
"errors": []
}
"""
pass
```
### 6. Cron設定
```bash
# crontab設定例
# 毎日午前2時に実行米国東部時間
0 2 * * * cd /path/to/TradingAgents && /usr/bin/python3 -m scripts.reddit_daily_update >> logs/cron.log 2>&1
# 週次でデータ検証毎週月曜日午前9時
0 9 * * 1 cd /path/to/TradingAgents && /usr/bin/python3 -m cli.main reddit verify --days 7 >> logs/verify.log 2>&1
# 月次でストレージクリーンアップ毎月1日午前3時
0 3 1 * * cd /path/to/TradingAgents && /usr/bin/python3 -m scripts.reddit_cleanup --days 90 >> logs/cleanup.log 2>&1
```
### 7. systemdサービスAlternative
```ini
# /etc/systemd/system/reddit-updater.service
[Unit]
Description=Reddit Data Daily Updater
After=network.target
[Service]
Type=oneshot
User=tradingagents
WorkingDirectory=/path/to/TradingAgents
Environment="PATH=/usr/bin:/usr/local/bin"
ExecStart=/usr/bin/python3 -m scripts.reddit_daily_update
[Install]
WantedBy=multi-user.target
```
### 8. 監視とアラート
```python
class UpdateMonitor:
"""
更新の監視とアラート
"""
def check_last_update(self):
"""
最後の更新をチェック
24時間以上更新がない場合はアラート
"""
pass
def send_alert(self, message: str, level: str = "error"):
"""
設定に基づいてアラートを送信
"""
if self.config['notifications']['methods']:
for method in self.config['notifications']['methods']:
if method == "log_file":
self.logger.error(message)
# elif method == "email":
# self.send_email_alert(message)
# elif method == "slack":
# self.send_slack_alert(message)
```
### 9. データ整合性チェック
```python
def validate_daily_data(self, date: str) -> dict:
"""
日次データの整合性チェック
Returns:
{
"valid": bool,
"issues": [...],
"statistics": {...}
}
"""
validator = RedditDataValidator()
# チェック項目
checks = {
"minimum_posts": self.check_minimum_posts(date),
"duplicate_ratio": self.check_duplicate_ratio(date),
"data_freshness": self.check_data_freshness(date),
"file_integrity": self.check_file_integrity(date)
}
return validator.run_checks(checks)
```
## 受け入れ条件
- [ ] 日次自動更新の安定動作
- [ ] エラー時の適切なリトライ
- [ ] 実行ログの記録と管理
- [ ] Cron/systemdでの実行対応
- [ ] データ整合性の自動チェック
- [ ] 設定ファイルでの柔軟な制御
- [ ] USE_PRAW_APIフラグでの切り替え
- [ ] 単体テストの実装(モック使用)
## 依存関係
- RedditDataFetcherチケット#009
- RedditCacheManagerチケット#010
- CLI実装チケット#011
- システムのcron/systemd
## タスク
- [ ] 単体テストの作成TDD
- [ ] USE_PRAW_APIフラグの統合
- [ ] RedditDailyUpdaterクラスの実装
- [ ] 設定ファイル形式の定義
- [ ] リトライ機能の実装
- [ ] ログ管理機能
- [ ] Cron設定スクリプト
- [ ] systemdサービス定義
- [ ] 監視・アラート機能
- [ ] データ整合性チェック
- [ ] エラー通知機能
- [ ] 運用ドキュメント作成

View File

@ -0,0 +1,288 @@
# チケット #014: Redditテスト実装
## 概要
Reddit praw実装の包括的なテストスイートの実装
## 目的
- 各コンポーネントの単体テスト
- 統合テストの実装
- モックを使用したAPI呼び出しのテスト
- 既存システムとの互換性テスト
## 実装要件
### 1. テスト優先開発TDDアプローチ
```python
# テストを先に作成してから実装を行う
# 各モジュールの実装前にテストを定義
```
### 2. テスト構造
```
tests/
├── unit/
│ ├── test_reddit_praw_client.py
│ ├── test_reddit_data_fetcher.py
│ ├── test_reddit_cache_manager.py
│ └── test_reddit_utils_compatibility.py
├── integration/
│ ├── test_reddit_cli_commands.py
│ ├── test_reddit_full_flow.py
│ └── test_reddit_auto_update.py
├── fixtures/
│ ├── reddit_mock_data.py
│ └── reddit_test_config.py
└── conftest.py
```
### 3. RedditPrawClient テスト
```python
# tests/unit/test_reddit_praw_client.py
class TestRedditPrawClient:
@pytest.fixture
def mock_reddit(self):
"""prawのRedditオブジェクトをモック"""
with patch('praw.Reddit') as mock:
yield mock
def test_authentication_success(self, mock_reddit):
"""認証成功のテスト"""
client = RedditPrawClient(test_config)
assert client.authenticate() is True
def test_authentication_failure(self, mock_reddit):
"""認証失敗のテスト"""
mock_reddit.side_effect = Exception("Invalid credentials")
client = RedditPrawClient(invalid_config)
assert client.authenticate() is False
def test_get_subreddit_posts(self, mock_reddit):
"""subreddit投稿取得のテスト"""
# モックデータ設定
mock_posts = create_mock_posts(count=10)
mock_reddit.return_value.subreddit.return_value.hot.return_value = mock_posts
client = RedditPrawClient(test_config)
posts = client.get_subreddit_posts("worldnews", limit=10)
assert len(posts) == 10
assert all('title' in post for post in posts)
```
### 4. RedditDataFetcher テスト
```python
# tests/unit/test_reddit_data_fetcher.py
class TestRedditDataFetcher:
def test_fetch_global_news(self, mock_client):
"""グローバルニュース取得のテスト"""
fetcher = RedditDataFetcher(mock_client, test_config)
posts = fetcher.fetch_global_news("2024-01-01", limit_per_subreddit=5)
# 5 subreddits × 5 posts = 25 posts expected
assert len(posts) == 25
assert all(post['posted_date'] == "2024-01-01" for post in posts)
def test_company_news_filtering(self):
"""企業関連投稿のフィルタリングテスト"""
posts = [
{"title": "Apple announces new iPhone", "selftext": "..."},
{"title": "Random tech news", "selftext": "..."},
{"title": "AAPL stock rises", "selftext": "..."},
]
filtered = filter_company_relevant_posts(posts, "AAPL", "Apple")
assert len(filtered) == 2
def test_duplicate_removal(self):
"""重複排除のテスト"""
fetcher = RedditDataFetcher(mock_client, test_config)
posts_with_duplicates = [
{"id": "abc123", "title": "Post 1"},
{"id": "def456", "title": "Post 2"},
{"id": "abc123", "title": "Post 1"}, # 重複
]
unique_posts = fetcher.remove_duplicates(posts_with_duplicates)
assert len(unique_posts) == 2
```
### 5. RedditCacheManager テスト
```python
# tests/unit/test_reddit_cache_manager.py
class TestRedditCacheManager:
@pytest.fixture
def temp_cache_dir(self, tmp_path):
"""テスト用の一時ディレクトリ"""
return tmp_path / "reddit_data"
def test_save_and_load_posts(self, temp_cache_dir):
"""投稿の保存と読み込みテスト"""
manager = RedditCacheManager(str(temp_cache_dir))
test_posts = [
{"id": "1", "title": "Test Post 1"},
{"id": "2", "title": "Test Post 2"},
]
# 保存
file_path = manager.save_posts(
test_posts, "global_news", "2024-01-01", "worldnews"
)
assert Path(file_path).exists()
# 読み込み
loaded_posts = manager.load_posts(
"global_news", "2024-01-01", "worldnews"
)
assert len(loaded_posts) == 2
assert loaded_posts[0]["title"] == "Test Post 1"
def test_fetch_history_tracking(self, temp_cache_dir):
"""取得履歴の記録テスト"""
manager = RedditCacheManager(str(temp_cache_dir))
manager.update_fetch_history(
"global_news",
"2024-01-01",
["worldnews", "news"],
"2024-01-02T10:00:00Z",
100
)
history = manager.get_fetch_history()
assert "global_news" in history
assert history["global_news"]["2024-01-01"]["post_count"] == 100
```
### 6. 統合テスト
```python
# tests/integration/test_reddit_full_flow.py
@pytest.mark.integration
class TestRedditFullFlow:
def test_end_to_end_data_fetch(self):
"""完全なデータ取得フローのテスト"""
# 実際のAPIは使わず、モックで代替
with patch('praw.Reddit') as mock_reddit:
setup_mock_reddit_responses(mock_reddit)
# CLI実行
result = runner.invoke(cli, [
'reddit', 'fetch-historical',
'--no-interactive',
'--start', '2024-01-01',
'--end', '2024-01-01',
'--category', 'global_news'
])
assert result.exit_code == 0
# データが保存されたか確認
cache_manager = RedditCacheManager(test_data_dir)
posts = cache_manager.load_posts("global_news", "2024-01-01")
assert len(posts) > 0
```
### 7. モックデータ生成
```python
# tests/fixtures/reddit_mock_data.py
def create_mock_post(post_id: str = None, **kwargs):
"""モック投稿データの生成"""
post = {
"id": post_id or str(uuid.uuid4()),
"title": kwargs.get("title", "Test Post Title"),
"selftext": kwargs.get("selftext", "Test post content"),
"url": kwargs.get("url", "https://reddit.com/test"),
"ups": kwargs.get("ups", random.randint(1, 1000)),
"created_utc": kwargs.get("created_utc", int(time.time())),
"subreddit": kwargs.get("subreddit", "test"),
"author": kwargs.get("author", "test_user"),
"num_comments": kwargs.get("num_comments", random.randint(0, 100))
}
return post
def create_mock_posts(count: int = 10, **kwargs):
"""複数のモック投稿を生成"""
return [create_mock_post(f"post_{i}", **kwargs) for i in range(count)]
```
### 8. 互換性テスト
```python
# tests/unit/test_reddit_utils_compatibility.py
class TestRedditUtilsCompatibility:
def test_legacy_interface_maintained(self):
"""既存インターフェースが維持されているか"""
# 既存の関数シグネチャでの呼び出し
result = fetch_top_from_category(
"global_news",
"2024-01-01",
100,
data_path="test_data"
)
assert isinstance(result, list)
assert all(isinstance(post, dict) for post in result)
def test_data_format_conversion(self):
"""データ形式の変換テスト"""
praw_posts = create_mock_posts(5)
legacy_posts = convert_praw_to_legacy_format(praw_posts)
# 既存形式のフィールドが存在するか
for post in legacy_posts:
assert "content" in post # selftextから変換
assert "upvotes" in post # upsから変換
assert "posted_date" in post # created_utcから変換
```
### 9. パフォーマンステスト
```python
@pytest.mark.performance
def test_large_data_handling():
"""大量データ処理のパフォーマンステスト"""
start_time = time.time()
# 1000件のモックデータで処理
large_dataset = create_mock_posts(1000)
manager = RedditCacheManager(test_dir)
manager.save_posts(large_dataset, "test", "2024-01-01")
duration = time.time() - start_time
assert duration < 5.0 # 5秒以内に完了
```
## 受け入れ条件
- [ ] テスト優先開発TDDの実践
- [ ] 全モジュールの単体テスト実装
- [ ] 統合テストの成功
- [ ] コードカバレッジ80%以上
- [ ] モックを使用したAPI非依存テスト
- [ ] 既存システムとの互換性確認
- [ ] パフォーマンステストの合格
- [ ] USE_PRAW_APIフラグのテスト
## 依存関係
- pytest
- pytest-mock
- pytest-cov
- 全Reddit実装モジュール
## タスク
- [ ] テストディレクトリ構造の作成
- [ ] モックデータ生成ユーティリティの作成
- [ ] RedditPrawClientのテスト作成
- [ ] RedditDataFetcherのテスト作成
- [ ] RedditCacheManagerのテスト作成
- [ ] CLIコマンドのテスト作成
- [ ] 統合テストの実装
- [ ] 互換性テスト
- [ ] 段階的実装フラグのテスト
- [ ] パフォーマンステスト
- [ ] CI/CD設定GitHub Actions

View File

@ -0,0 +1,197 @@
# チケット #015: Reddit単一APIキー最適化実装
## 概要
単一のReddit APIキーで最大限のパフォーマンスを実現する最適化実装基本機能実装後に検討
## 目的
- API呼び出し回数の最小化
- 待機時間の有効活用
- データ処理の並列化
- レート制限内での最速処理
## 注意
この最適化は基本機能が安定動作した後に実装を検討する
## 実装要件
### 1. バッチ取得の最適化
```python
class OptimizedRedditFetcher:
def fetch_multiple_subreddits(self,
subreddits: List[str],
limit: int = 100) -> Dict[str, List[dict]]:
"""
複数subredditを1回のAPI呼び出しで取得
Args:
subreddits: subredditのリスト
limit: 取得件数最大100
Example:
# 5つのsubredditから各20件ずつ取得したい場合
# 通常: 5回のAPI呼び出し
# 最適化: 1回のAPI呼び出し
combined = "+".join(subreddits)
posts = reddit.subreddit(combined).hot(limit=100)
"""
pass
```
### 2. 非同期処理の活用
```python
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
class AsyncRedditProcessor:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=4)
async def process_batch(self, tickers: List[str], date: str):
"""
API呼び出しと並列処理可能なタスクを効率的に実行
"""
tasks = []
# API呼び出しレート制限あり
for ticker in tickers:
api_data = await self.fetch_with_rate_limit(ticker, date)
# データ処理は並列実行
tasks.extend([
self.executor.submit(self.sentiment_analysis, api_data),
self.executor.submit(self.save_to_cache, api_data),
self.executor.submit(self.extract_metrics, api_data)
])
# 並列処理の完了を待つ
await asyncio.gather(*tasks)
```
### 3. インテリジェントキャッシング
```python
class SmartCache:
def __init__(self):
self.memory_cache = {} # 頻繁にアクセスされるデータ
self.disk_cache = RedditCacheManager()
self.access_patterns = {} # アクセスパターンの追跡
def get_with_prediction(self, ticker: str, date: str):
"""
アクセスパターンを学習して先読みキャッシング
"""
# よく一緒にアクセスされる銘柄を予測
related_tickers = self.predict_related_tickers(ticker)
# バックグラウンドでプリフェッチ
self.prefetch_async(related_tickers, date)
```
### 4. API呼び出しの最適化戦略
```python
class APICallOptimizer:
def __init__(self):
self.call_queue = PriorityQueue()
self.rate_limiter = RateLimiter(calls_per_minute=80)
def optimize_call_order(self, requests: List[APIRequest]) -> List[APIRequest]:
"""
API呼び出しの順序を最適化
優先順位:
1. 複数銘柄を含むバッチリクエスト
2. 人気銘柄
3. キャッシュミスのデータ
4. その他
"""
# バッチ可能なリクエストをグループ化
batched = self.group_batchable_requests(requests)
# 優先度順にソート
return self.sort_by_priority(batched)
```
### 5. 処理パイプライン
```python
class ProcessingPipeline:
"""
データ取得から保存までのパイプライン処理
"""
def __init__(self):
self.stages = [
self.fetch_stage, # API呼び出し直列
self.filter_stage, # フィルタリング(並列可)
self.transform_stage, # データ変換(並列可)
self.analyze_stage, # 分析処理(並列可)
self.save_stage # 保存(並列可)
]
async def run_pipeline(self, input_data):
"""
パイプライン実行
"""
data = input_data
for stage in self.stages:
if stage.can_parallelize:
# 並列実行
data = await self.run_parallel(stage, data)
else:
# 直列実行API呼び出しなど
data = await stage(data)
return data
```
### 6. レート制限の賢い管理
```python
class SmartRateLimiter:
def __init__(self, calls_per_minute: int = 80):
self.calls_per_minute = calls_per_minute
self.call_history = deque(maxlen=calls_per_minute)
def adaptive_wait(self):
"""
現在のレート使用状況に応じて待機時間を調整
"""
usage_rate = self.get_current_usage_rate()
if usage_rate > 0.9: # 90%以上使用
# 安全のため長めに待機
wait_time = 1.0
elif usage_rate > 0.7: # 70-90%使用
# 標準的な待機
wait_time = 0.75
else: # 70%未満
# 積極的に使用
wait_time = 0.6
return wait_time
```
## 受け入れ条件
- [ ] 基本機能が安定動作していること
- [ ] 単一APIキーで現在より30%以上の高速化
- [ ] レート制限違反ゼロ
- [ ] API呼び出し数の最小化を実証
- [ ] 並列処理による効率化の測定
- [ ] エラー時の適切なフォールバック
## 依存関係
- asyncio非同期処理
- aiohttp非同期HTTP
- concurrent.futures並列処理
- 既存のReddit実装モジュール
## タスク(基本機能実装後)
- [ ] 基本機能の安定性確認
- [ ] OptimizedRedditFetcherクラスの実装
- [ ] 非同期処理フレームワークの構築
- [ ] バッチ取得ロジックの実装
- [ ] インテリジェントキャッシングシステム
- [ ] API呼び出し最適化アルゴリズム
- [ ] 処理パイプラインの実装
- [ ] 適応的レート制限管理
- [ ] パフォーマンステスト
- [ ] 最適化効果の測定ツール
- [ ] ドキュメント作成

View File

@ -0,0 +1,420 @@
# チケット #016: Reddit時間制御とデータ整合性実装
## 概要
未来日のデータ取得を防止し、市場時間を考慮したデータ取得時間制御の実装(初期実装は米国市場のみ)
## 目的
- バックテストでの未来情報リーク防止
- 市場開始前のデータのみを使用
- タイムゾーンの適切な処理(初期実装: 米国東部時間のみ)
- データの時間的整合性の保証
## 実装要件
### 1. 時間検証クラス(初期実装: 米国市場のみ)
```python
from datetime import datetime, time, timezone, timedelta
import pytz
import pandas_market_calendars as mcal
class TemporalDataValidator:
def __init__(self):
# 初期実装: 米国市場のみ
self.market_timezone = pytz.timezone('America/New_York')
self.market_open = time(9, 30)
self.data_cutoff_time = time(9, 0) # 30分前
self.calendar = mcal.get_calendar('NYSE')
# 将来の拡張用(コメントアウト)
# self.market_configs = {
# 'US': {...},
# 'JP': {...},
# 'EU': {...},
# 'DE': {...}
# }
def validate_date_not_future(self, target_date: str) -> bool:
"""
対象日が未来日でないことを確認
Args:
target_date: YYYY-MM-DD形式の日付
Returns:
有効な場合True、未来日の場合False
Raises:
ValueError: 未来日を指定した場合
"""
target = datetime.strptime(target_date, "%Y-%m-%d").date()
today = datetime.now(self.market_timezone).date()
if target > today:
raise ValueError(
f"Cannot fetch data for future date: {target_date}. "
f"Today is {today} (EST)."
)
return True
def is_market_open_day(self, date: str) -> bool:
"""
指定日が米国市場の営業日かどうかをチェック
Args:
date: YYYY-MM-DD形式の日付
Returns:
営業日の場合True、休場日の場合False
"""
# 指定日が営業日かチェック
schedule = self.calendar.schedule(start_date=date, end_date=date)
return len(schedule) > 0
def get_next_market_day(self, date: str) -> str:
"""
次の営業日を取得(週末・祝日をスキップ)
"""
date_obj = datetime.strptime(date, "%Y-%m-%d")
# 次の営業日を探す
for i in range(1, 10): # 最大10日先まで検索
next_date = date_obj + timedelta(days=i)
next_date_str = next_date.strftime("%Y-%m-%d")
if self.is_market_open_day(next_date_str):
return next_date_str
raise ValueError(f"No market open day found within 10 days after {date}")
def get_data_cutoff_timestamp(self, target_date: str) -> int:
"""
米国市場の開始30分前のタイムスタンプを取得
Args:
target_date: YYYY-MM-DD形式の日付
Returns:
Unix timestamp (UTC)
"""
# 営業日チェック
if not self.is_market_open_day(target_date):
raise ValueError(
f"{target_date} is not a market open day. "
f"Next open day is {self.get_next_market_day(target_date)}"
)
date_obj = datetime.strptime(target_date, "%Y-%m-%d").date()
# タイムゾーンを考慮してカットオフ時刻を作成
# pytzは自動的にDSTサマータイムを処理
cutoff_datetime = self.market_timezone.localize(
datetime.combine(date_obj, self.data_cutoff_time)
)
# UTCに変換してUnixタイムスタンプを返す
return int(cutoff_datetime.timestamp())
```
### 2. 時間フィルタリング機能
```python
class TemporalDataFilter:
def __init__(self, validator: TemporalDataValidator):
self.validator = validator
def filter_posts_by_cutoff(self,
posts: List[dict],
target_date: str) -> List[dict]:
"""
米国市場開始30分前までの投稿のみをフィルタリング
Args:
posts: Reddit投稿のリスト
target_date: 対象日付
Returns:
フィルタリング後の投稿リスト
"""
try:
cutoff_timestamp = self.validator.get_data_cutoff_timestamp(target_date)
except ValueError as e:
# 休場日の場合は空リストを返す
logging.info(str(e))
return []
filtered_posts = []
for post in posts:
post_timestamp = post.get('created_utc', 0)
# カットオフ時刻より前の投稿のみを含める
if post_timestamp < cutoff_timestamp:
filtered_posts.append(post)
else:
logging.debug(
f"Excluded post '{post.get('title')}' - "
f"posted after cutoff time"
)
logging.info(
f"Filtered {len(posts) - len(filtered_posts)} posts "
f"posted after {target_date} 09:00 EST"
)
return filtered_posts
```
### 3. 日付範囲の検証
```python
class DateRangeValidator:
def validate_date_range(self, start_date: str, end_date: str) -> dict:
"""
日付範囲の妥当性を検証
Returns:
{
"valid": bool,
"adjustments": {
"original_end": str,
"adjusted_end": str
},
"warnings": List[str]
}
"""
validator = TemporalDataValidator()
warnings = []
adjustments = {}
# 終了日が今日以降の場合は昨日に調整
today = datetime.now(validator.market_timezone).date()
end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
if end_date_obj >= today:
yesterday = today - timedelta(days=1)
adjustments["original_end"] = end_date
adjustments["adjusted_end"] = yesterday.strftime("%Y-%m-%d")
warnings.append(
f"End date adjusted from {end_date} to {adjustments['adjusted_end']} "
f"to prevent future data access"
)
end_date = adjustments["adjusted_end"]
# 当日のデータ取得に関する警告
if end_date_obj == today:
current_time = datetime.now(validator.market_timezone).time()
if current_time < validator.data_cutoff_time:
warnings.append(
f"Today's data is not yet available. "
f"Data becomes available after 09:00 EST."
)
return {
"valid": True,
"adjustments": adjustments,
"warnings": warnings
}
```
### 4. 今後の拡張: マルチマーケット対応
```python
# 将来の実装予定: 複数市場のサポート
# 現在は米国市場のみをサポート
#
# class MultiMarketBatchProcessor:
# """
# 複数市場のティッカーを効率的に処理
# """
# pass
```
### 5. Reddit API取得時の時間制御
```python
class TimeAwareRedditFetcher(RedditDataFetcher):
def __init__(self, client, config):
super().__init__(client, config)
self.temporal_validator = TemporalDataValidator()
self.temporal_filter = TemporalDataFilter(self.temporal_validator)
def fetch_company_news(self,
ticker: str,
company_name: str,
date: str,
limit: int = 50) -> List[dict]:
"""
時間制御付きの企業ニュース取得(米国市場対応)
"""
# 未来日チェック
self.temporal_validator.validate_date_not_future(date)
# 市場営業日チェック
if not self.temporal_validator.is_market_open_day(date):
logging.info(
f"{date} is not a trading day. Skipping."
)
return []
# 通常の取得処理
posts = super().fetch_company_news(ticker, company_name, date, limit)
# 時間フィルタリング
filtered_posts = self.temporal_filter.filter_posts_by_cutoff(
posts, date
)
return filtered_posts
```
### 5. バックテスト実行時の検証
```python
class BacktestTimeValidator:
"""
バックテスト実行時の時間整合性チェック
"""
def validate_backtest_date(self, test_date: str) -> dict:
"""
バックテスト日付の妥当性検証
Returns:
{
"can_run": bool,
"reason": str,
"next_available_time": datetime
}
"""
validator = TemporalDataValidator()
now = datetime.now(validator.market_timezone)
# 今日の日付でバックテストする場合
if test_date == now.strftime("%Y-%m-%d"):
if now.time() < validator.data_cutoff_time:
next_available = datetime.combine(
now.date(),
validator.data_cutoff_time,
tzinfo=validator.market_timezone
)
return {
"can_run": False,
"reason": f"Today's data not yet available. Available after 09:00 EST.",
"next_available_time": next_available
}
return {
"can_run": True,
"reason": "Date is valid for backtesting",
"next_available_time": None
}
```
### 6. CLI統合
```python
# cli/commands/reddit.py への追加
@reddit.command()
@click.option('--force', is_flag=True, help='Force fetch without time validation')
def fetch_historical(force):
"""Fetch historical Reddit data with temporal validation"""
if not force:
# 日付範囲の検証
validator = DateRangeValidator()
validation_result = validator.validate_date_range(start_date, end_date)
if validation_result["warnings"]:
for warning in validation_result["warnings"]:
console.print(f"[yellow]Warning: {warning}[/yellow]")
if validation_result["adjustments"]:
console.print(
f"[cyan]Date range adjusted: "
f"{validation_result['adjustments']['original_end']} → "
f"{validation_result['adjustments']['adjusted_end']}[/cyan]"
)
if not click.confirm("Continue with adjusted dates?"):
return
```
### 7. 設定オプション
```python
# config/reddit_temporal_config.yaml
temporal_settings:
# タイムゾーン設定
market_timezone: "America/New_York"
# 市場時間
market_open: "09:30"
market_close: "16:00"
# データ取得カットオフ(市場開始何分前か)
data_cutoff_minutes_before_open: 30
# 検証設定
validation:
prevent_future_data: true
enforce_cutoff_time: true
allow_override: false # --forceオプションの許可
# 警告設定
warnings:
show_timezone_info: true
show_next_available_time: true
```
### 8. ログとモニタリング
```python
class TemporalAuditLogger:
"""
時間制御に関する監査ログ
"""
def log_temporal_filtering(self,
date: str,
total_posts: int,
filtered_posts: int,
cutoff_time: str):
"""
時間フィルタリングの結果を記録
"""
audit_entry = {
"timestamp": datetime.now().isoformat(),
"target_date": date,
"cutoff_time_est": cutoff_time,
"total_posts_fetched": total_posts,
"posts_after_cutoff": total_posts - filtered_posts,
"posts_included": filtered_posts,
"filter_ratio": (total_posts - filtered_posts) / total_posts if total_posts > 0 else 0
}
self.write_audit_log(audit_entry)
```
## 受け入れ条件
- [ ] 未来日のデータ取得が確実に防止される
- [ ] 米国市場の開始30分前09:00 EST/EDTでデータが切られる
- [ ] 週末・祝日が自動的にスキップされる
- [ ] サマータイムが自動的に処理される
- [ ] タイムゾーンが正しく処理される
- [ ] バックテストでの時間的整合性が保証される
- [ ] 適切な警告とログが出力される
- [ ] 既存システムとの互換性維持
- [ ] 単体テストの実装(モック使用)
## 依存関係
- pytzタイムゾーン処理、DST対応
- pandas_market_calendars米国市場の営業日カレンダー
- 既存のRedditDataFetcher
- CLI実装
## タスク
- [ ] 単体テストの作成TDD
- [ ] TemporalDataValidatorクラスの実装米国市場のみ
- [ ] TemporalDataFilterクラスの実装
- [ ] DateRangeValidatorの実装
- [ ] TimeAwareRedditFetcherの実装
- [ ] 米国市場カレンダーの統合NYSE
- [ ] バックテスト時間検証機能
- [ ] CLI統合警告表示
- [ ] 設定ファイルの追加
- [ ] 監査ログ機能
- [ ] タイムゾーン・DST処理のテスト
- [ ] 米国市場の休場日テスト
- [ ] 統合テスト
- [ ] ドキュメント更新

272
docs/API_REFERENCE.md Normal file
View File

@ -0,0 +1,272 @@
# TradingAgents Backtest API Reference
## Overview
The TradingAgents backtest module provides a comprehensive framework for backtesting trading strategies using the multi-agent LLM system.
## Core Classes
### TAFlowStrategy
Wrapper class for TradingAgentsGraph to use in backtesting.
```python
from tradingagents.backtest.ta_flow_strategy import TAFlowStrategy
strategy = TAFlowStrategy(config=None, debug=False)
```
#### Parameters
- `config` (dict, optional): Configuration dictionary for TradingAgentsGraph
- `debug` (bool): Enable debug mode for detailed logging
#### Methods
##### decide(ticker: str, date: str) -> str
Get trading decision for a given ticker and date.
**Parameters:**
- `ticker`: Stock symbol (e.g., "AAPL")
- `date`: Date in YYYY-MM-DD format
**Returns:**
- One of "Buy", "Sell", or "Hold"
**Example:**
```python
signal = strategy.decide("AAPL", "2024-01-01")
# Returns: "Buy"
```
### BacktestEngine
Main backtest simulation engine.
```python
from tradingagents.backtest.engine import BacktestEngine
engine = BacktestEngine(initial_capital=100000.0)
```
#### Parameters
- `initial_capital` (float): Starting capital for the simulation
#### Methods
##### run(ticker, start_date, end_date, strategy, fee_rate=0.001, slippage_rate=0.0005, progress_callback=None) -> BacktestResult
Run the backtest simulation.
**Parameters:**
- `ticker` (str): Stock symbol
- `start_date` (str): Start date (YYYY-MM-DD)
- `end_date` (str): End date (YYYY-MM-DD)
- `strategy` (TAFlowStrategy): Strategy object to generate signals
- `fee_rate` (float): Trading fee as percentage of trade value
- `slippage_rate` (float): Slippage as percentage of price
- `progress_callback` (callable, optional): Callback for progress updates
**Returns:**
- `BacktestResult` object with all simulation data
**Example:**
```python
result = engine.run(
ticker="NVDA",
start_date="2024-01-01",
end_date="2024-03-31",
strategy=strategy,
fee_rate=0.001,
slippage_rate=0.0005
)
```
### BacktestResult
Data class containing complete backtest results.
#### Attributes
- `ticker` (str): Stock symbol
- `start_date` (str): Backtest start date
- `end_date` (str): Backtest end date
- `initial_capital` (float): Starting capital
- `final_capital` (float): Ending capital
- `trades` (List[Trade]): List of all trades executed
- `equity_curve` (List[float]): Daily portfolio values
- `daily_returns` (List[float]): Daily return percentages
- `metrics` (PerformanceMetrics): Strategy performance metrics
- `buy_hold_metrics` (PerformanceMetrics): Buy & hold comparison metrics
- `dates` (List[str]): Trading dates
### PerformanceMetrics
Container for all performance metrics.
#### Attributes
- `total_return` (float): Cumulative return
- `annual_return` (float): Annualized return
- `sharpe_ratio` (float): Sharpe ratio (risk-free rate = 0)
- `max_drawdown` (float): Maximum drawdown percentage
- `max_drawdown_duration` (int): Max drawdown duration in days
- `win_rate` (float): Percentage of winning trades
- `profit_factor` (float): Total profits / Total losses
- `total_trades` (int): Total number of trades
- `volatility` (float): Annualized volatility
### BacktestRepository
Repository for storing and retrieving backtest results.
```python
from tradingagents.backtest.persistence import BacktestRepository
repository = BacktestRepository(db_path="results.sqlite")
```
#### Methods
##### save_backtest(result: BacktestResult, config: dict) -> int
Save backtest result to database.
**Returns:**
- ID of saved backtest
##### get_backtest(backtest_id: int) -> dict
Retrieve backtest by ID.
##### list_backtests(ticker=None, start_date=None, end_date=None, limit=100) -> List[BacktestSummary]
List backtests with optional filters.
### ReportManager
Manage HTML report generation and storage.
```python
from tradingagents.backtest.output import ReportManager
manager = ReportManager(reports_dir="reports/")
```
#### Methods
##### save_report(result: BacktestResult, ticker: str, start_date: str, end_date: str) -> Path
Generate and save HTML report.
**Returns:**
- Path to saved report
## Complete Example
```python
from tradingagents.backtest.ta_flow_strategy import TAFlowStrategy
from tradingagents.backtest.engine import BacktestEngine
from tradingagents.backtest.persistence import BacktestRepository
from tradingagents.backtest.output import ReportManager, TerminalReporter
from tradingagents.default_config import DEFAULT_CONFIG
# Configure strategy
config = DEFAULT_CONFIG.copy()
config["online_tools"] = False # Force offline mode
config["max_debate_rounds"] = 1 # Reduce for faster testing
# Initialize components
strategy = TAFlowStrategy(config=config, debug=True)
engine = BacktestEngine(initial_capital=100000)
# Run backtest
result = engine.run(
ticker="AAPL",
start_date="2024-01-01",
end_date="2024-03-31",
strategy=strategy,
fee_rate=0.001,
slippage_rate=0.0005
)
# Display results
reporter = TerminalReporter()
reporter.display_summary(result.metrics, result.buy_hold_metrics)
# Save to database
repository = BacktestRepository()
backtest_id = repository.save_backtest(result, config)
print(f"Saved with ID: {backtest_id}")
# Generate HTML report
manager = ReportManager()
report_path = manager.save_report(result, "AAPL", "2024-01-01", "2024-03-31")
print(f"Report saved to: {report_path}")
```
## Configuration Options
### Strategy Configuration
```python
config = {
"online_tools": False, # Always False for backtesting
"deep_think_llm": "gpt-4", # LLM for complex decisions
"quick_think_llm": "gpt-4o-mini", # LLM for quick decisions
"max_debate_rounds": 3, # Number of debate rounds
"llm_provider": "openai", # LLM provider
"backend_url": None, # Custom backend URL
}
```
### Position Management Rules
The backtest engine follows these position management rules:
1. **No Position → Buy Signal**: Open long position
2. **No Position → Sell Signal**: Open short position
3. **Long Position → Sell Signal**: Close long, open short
4. **Short Position → Buy Signal**: Close short, open long
5. **Any Position → Hold Signal**: Maintain current position
### Fee and Slippage Calculation
- **Buy Price**: `market_price × (1 + slippage_rate)`
- **Sell Price**: `market_price × (1 - slippage_rate)`
- **Trading Fee**: `trade_value × fee_rate`
## Error Handling
The system includes comprehensive error handling:
- **Strategy Errors**: Return "Hold" signal on error
- **Data Errors**: Raise ValueError with descriptive message
- **Persistence Errors**: Log error and re-raise
## Performance Considerations
- **LLM Calls**: Each trading day requires one LLM call
- **Caching**: Use offline mode to leverage cached data
- **Batch Processing**: Process multiple tickers sequentially
- **Memory**: Results are stored in memory during simulation
## Extending the Framework
### Custom Strategy
```python
class CustomStrategy:
def decide(self, ticker: str, date: str) -> str:
# Your logic here
return "Buy" # or "Sell" or "Hold"
# Use with BacktestEngine
engine = BacktestEngine()
result = engine.run(ticker="AAPL", strategy=CustomStrategy(), ...)
```
### Custom Metrics
```python
from tradingagents.backtest.metrics import MetricsCalculator
class CustomMetricsCalculator(MetricsCalculator):
def calculate(self, equity_curve, trades, initial_capital, start_date, end_date):
metrics = super().calculate(...)
# Add custom calculations
metrics.custom_metric = self._calculate_custom_metric(...)
return metrics
```

168
docs/BACKTEST_README.md Normal file
View File

@ -0,0 +1,168 @@
# TradingAgents バックテスト機能
## 概要
TradingAgentsのマルチエージェントシステムを使用した株式取引戦略のバックテスト機能です。過去の市場データに対して、実際のエージェントフローアナリスト→リサーチャー→トレーダー→リスク管理→ポートフォリオマネージャーを実行し、パフォーマンスを評価します。
## インストール
```bash
# 依存関係のインストール
pip install -r requirements.txt
```
## 使用方法
### 1. バックテストの実行
```bash
python -m cli.main backtest run \
--ticker AAPL \
--start 2024-01-01 \
--end 2024-03-31 \
--fee 0.001 \
--slippage 0.0005
```
#### パラメータ
- `--ticker`: 銘柄シンボル(必須)
- `--start`: 開始日 YYYY-MM-DD必須
- `--end`: 終了日 YYYY-MM-DD必須
- `--fee`: 取引手数料率(デフォルト: 0.1%
- `--slippage`: スリッページ率(デフォルト: 0.05%
- `--initial-capital`: 初期資金(デフォルト: $100,000
### 2. 過去の結果一覧
```bash
# すべての結果を表示
python -m cli.main backtest list
# 特定の銘柄でフィルタ
python -m cli.main backtest list --ticker AAPL
# 表示数を制限
python -m cli.main backtest list --limit 10
```
### 3. 結果の詳細表示
```bash
# ID 5の結果を表示HTMLレポートを開く
python -m cli.main backtest show 5
```
### 4. 結果の比較
```bash
# ID 3とID 7の結果を比較
python -m cli.main backtest compare 3 7
```
## シミュレーション仕様
### ポジション管理
- 未保有で Buy → ロング新規
- 未保有で Sell → ショート新規
- ロング中に Sell → ロング決済+ショート新規
- ショート中に Buy → ショート決済+ロング新規
- Hold → 現在のポジションを維持
### 取引ルール
- 全資産を100%投入レバレッジ1倍
- 取引コスト = 取引金額 × fee_rate
- スリッページ:
- 買い: 価格 × (1 + slippage_rate)
- 売り: 価格 × (1 - slippage_rate)
## 評価指標
| 指標 | 説明 |
|------|------|
| Total Return | 累積リターン |
| Annual Return | 年率換算リターン |
| Sharpe Ratio | リスク調整後リターン |
| Max Drawdown | 最大ドローダウン |
| Win Rate | 勝率 |
| Profit Factor | 総利益 ÷ 総損失 |
## 出力
### 1. ターミナル出力
美しいテーブル形式で戦略とBuy & Holdの比較結果を表示
### 2. HTMLレポート
- インタラクティブなグラフPlotly
- 資産推移曲線
- ドローダウンチャート
- リターン分布
- 取引履歴テーブル
レポートは `reports/` ディレクトリに保存されます。
### 3. データベース永続化
結果は `results.sqlite` に保存され、後から参照・比較が可能です。
## プログラム例
```python
from tradingagents.backtest.ta_flow_strategy import TAFlowStrategy
from tradingagents.backtest.engine import BacktestEngine
from tradingagents.default_config import DEFAULT_CONFIG
# 設定
config = DEFAULT_CONFIG.copy()
config["online_tools"] = False # オフラインモード
# 戦略の初期化
strategy = TAFlowStrategy(config=config)
# バックテストエンジンの初期化
engine = BacktestEngine(initial_capital=100000)
# バックテスト実行
result = engine.run(
ticker="NVDA",
start_date="2024-01-01",
end_date="2024-03-31",
strategy=strategy,
fee_rate=0.001,
slippage_rate=0.0005
)
print(f"Total Return: {result.metrics.total_return:.2%}")
print(f"Sharpe Ratio: {result.metrics.sharpe_ratio:.2f}")
print(f"Max Drawdown: {result.metrics.max_drawdown:.2%}")
```
## 注意事項
1. **APIキー設定**: 環境変数に以下を設定してください
```bash
export OPENAI_API_KEY=your_api_key
export FINNHUB_API_KEY=your_api_key
```
2. **オフラインモード**: バックテストは自動的にオフラインモードで実行され、キャッシュされたデータのみを使用します
3. **データの先読み防止**: 各日の判断は、その日の終値データまでしか使用しません
4. **計算負荷**: LLMを使用するため、長期間のバックテストは時間がかかります
## トラブルシューティング
### エラー: "No price data available"
- インターネット接続を確認してください
- 銘柄シンボルが正しいか確認してください
- 指定期間に市場が開いていたか確認してください
### エラー: "Strategy error"
- APIキーが正しく設定されているか確認してください
- LLMのレート制限に達していないか確認してください
## 拡張予定
- 複数銘柄の同時シミュレーション
- ポートフォリオ最適化
- リアルタイム取引との統合
- より詳細なリスク分析

386
docs/DEVELOPMENT_GUIDE.md Normal file
View File

@ -0,0 +1,386 @@
# TradingAgents Backtest Development Guide
## Project Structure
```
TradingAgents/
├── tradingagents/
│ └── backtest/
│ ├── __init__.py
│ ├── ta_flow_strategy.py # Strategy wrapper
│ ├── engine.py # Simulation engine
│ ├── metrics.py # Performance metrics
│ ├── output.py # Output and reporting
│ └── persistence.py # Database storage
├── cli/
│ └── commands/
│ └── backtest.py # CLI commands
├── tests/
│ ├── unit/ # Unit tests
│ ├── integration/ # Integration tests
│ └── conftest.py # Test fixtures
├── docs/
│ ├── 001-007_*.md # Implementation tickets
│ ├── API_REFERENCE.md # API documentation
│ └── BACKTEST_README.md # User guide
└── requirements.txt # Dependencies
```
## Development Setup
### Environment Setup
```bash
# Create virtual environment
conda create -n tradingagents python=3.11
conda activate tradingagents
# Install dependencies
pip install -r requirements.txt
# Install in development mode
pip install -e .
```
### Required Environment Variables
```bash
export OPENAI_API_KEY=your_openai_api_key
export FINNHUB_API_KEY=your_finnhub_api_key
```
## Running Tests
### Unit Tests
```bash
# Run all unit tests
pytest tests/unit -v
# Run specific test file
pytest tests/unit/test_ta_flow_strategy.py -v
# Run with coverage
pytest tests/unit --cov=tradingagents.backtest --cov-report=html
```
### Integration Tests
```bash
# Run integration tests
pytest tests/integration -v -m integration
# Skip slow tests
pytest -m "not slow"
```
### Test Coverage
```bash
# Generate coverage report
pytest --cov=tradingagents.backtest --cov-report=html --cov-report=term-missing
# View HTML report
open htmlcov/index.html
```
## Code Style Guide
### Python Style
- Follow PEP 8
- Use type hints for all function signatures
- Maximum line length: 100 characters
- Use docstrings for all public methods
### Example:
```python
from typing import List, Dict, Optional
def calculate_metrics(
equity_curve: List[float],
trades: List[Trade],
initial_capital: float
) -> PerformanceMetrics:
"""Calculate performance metrics from backtest results.
Args:
equity_curve: List of portfolio values over time
trades: List of executed trades
initial_capital: Starting capital amount
Returns:
PerformanceMetrics object with calculated metrics
Raises:
ValueError: If equity_curve is empty
"""
if not equity_curve:
raise ValueError("Equity curve cannot be empty")
# Implementation
return metrics
```
### Import Order
1. Standard library imports
2. Third-party imports
3. Local application imports
```python
import logging
from dataclasses import dataclass
from typing import List, Optional
import pandas as pd
import numpy as np
from rich.console import Console
from tradingagents.backtest.metrics import PerformanceMetrics
from tradingagents.default_config import DEFAULT_CONFIG
```
## Adding New Features
### 1. Create Feature Branch
```bash
git checkout -b feature/new-indicator
```
### 2. Implement Feature
Follow the existing patterns:
```python
# In metrics.py
def _calculate_new_indicator(self, data: List[float]) -> float:
"""Calculate new indicator.
Args:
data: Input data series
Returns:
Calculated indicator value
"""
# Implementation
return result
```
### 3. Add Tests
```python
# In test_metrics_calculator.py
def test_new_indicator_calculation(self):
"""Test new indicator calculation."""
calculator = MetricsCalculator()
result = calculator._calculate_new_indicator([1, 2, 3, 4, 5])
assert result == expected_value
```
### 4. Update Documentation
- Add to API_REFERENCE.md
- Update relevant ticket documentation
- Add usage example to BACKTEST_README.md
## Debugging
### Enable Debug Logging
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# Or for specific module
logger = logging.getLogger('tradingagents.backtest')
logger.setLevel(logging.DEBUG)
```
### Common Issues
#### Issue: "No price data available"
**Solution:** Check internet connection and ticker symbol validity
#### Issue: LLM timeout
**Solution:** Increase timeout or use faster model
```python
config = {
"quick_think_llm": "gpt-4o-mini", # Faster model
"max_debate_rounds": 1, # Reduce rounds
}
```
#### Issue: Memory error with large backtests
**Solution:** Process in chunks or reduce date range
## Performance Optimization
### 1. Use Offline Mode
Always set `online_tools = False` for backtesting to use cached data.
### 2. Optimize LLM Usage
```python
config = {
"quick_think_llm": "gpt-4o-mini", # Use faster model
"max_debate_rounds": 1, # Reduce debate rounds
}
```
### 3. Batch Processing
Process multiple backtests efficiently:
```python
tickers = ["AAPL", "GOOGL", "MSFT"]
results = []
for ticker in tickers:
result = engine.run(ticker=ticker, ...)
results.append(result)
# Save periodically to avoid memory issues
if len(results) % 10 == 0:
save_results(results)
results.clear()
```
## Database Schema
### backtests table
```sql
CREATE TABLE backtests (
id INTEGER PRIMARY KEY,
ticker VARCHAR(10),
start_date DATE,
end_date DATE,
initial_capital DECIMAL(10,2),
final_capital DECIMAL(10,2),
total_return DECIMAL(10,4),
sharpe_ratio DECIMAL(10,4),
-- ... other metrics
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
### trades table
```sql
CREATE TABLE trades (
id INTEGER PRIMARY KEY,
backtest_id INTEGER,
trade_date DATE,
action VARCHAR(10),
price DECIMAL(10,4),
shares DECIMAL(10,4),
fee DECIMAL(10,4),
FOREIGN KEY (backtest_id) REFERENCES backtests(id)
);
```
## Contributing
### Pull Request Process
1. Create feature branch from `main`
2. Write tests for new functionality
3. Ensure all tests pass
4. Update documentation
5. Submit PR with clear description
### Commit Messages
Follow conventional commits:
```
feat: add new performance metric
fix: correct slippage calculation
docs: update API reference
test: add integration tests for persistence
refactor: simplify metrics calculation
```
## Release Process
1. Update version in `setup.py`
2. Update CHANGELOG.md
3. Run full test suite
4. Create release tag
5. Build and publish package
```bash
# Build package
python setup.py sdist bdist_wheel
# Upload to PyPI (if applicable)
twine upload dist/*
```
## Monitoring and Maintenance
### Log Files
Logs are stored in:
- `logs/backtest_*.log` - Backtest execution logs
- `results_dir/*/message_tool.log` - Agent communication logs
### Database Maintenance
```python
# Backup database
from tradingagents.backtest.persistence import DataMigration
DataMigration.backup_database("results.sqlite", "backup/results_backup.sqlite")
# Export to CSV
repository = BacktestRepository()
DataMigration.export_to_csv(repository, backtest_id=1, output_dir="exports/")
```
### Performance Monitoring
Monitor key metrics:
- Execution time per backtest
- Memory usage
- Database size
- API call counts
## Future Enhancements
### Planned Features
1. **Multi-asset Portfolio Backtesting**
- Support multiple positions
- Portfolio rebalancing
- Correlation analysis
2. **Advanced Risk Metrics**
- VaR (Value at Risk)
- CVaR (Conditional VaR)
- Sortino ratio
3. **Real-time Integration**
- Live trading connection
- Paper trading mode
- Alert system
4. **Performance Optimization**
- Parallel processing
- GPU acceleration for metrics
- Distributed backtesting
### Extension Points
The framework is designed to be extensible:
1. **Custom Strategies**: Implement the `decide()` method
2. **Custom Metrics**: Extend `MetricsCalculator`
3. **Custom Output Formats**: Extend `ReportManager`
4. **Custom Data Sources**: Implement data fetcher interface

View File

@ -0,0 +1,155 @@
# Reddit API レート制限の分析と最適なデータ収集戦略
## 調査結果サマリー
### Reddit API の現在のレート制限2024年
1. **認証済みOAuthクライアント**: 100 queries per minute (QPM)
2. **非認証クライアント**: 10 QPM実質使用不可
3. **バッチサイズ**: 1リクエストで最大100アイテム取得可能
4. **レート計算方法**: 10分間の平均で計算バースト対応
5. **制限単位**: OAuth Client ID ごと(ユーザーごとではない)
### 重要な制約
- **1000件制限**: 各検索・リストで最大1000件までしか取得できない
- **過去データ制限**: 6ヶ月以上前の投稿は取得が困難
- **料金**: 無料枠を超えると $0.24 per 1,000 API calls
## データ収集量の最適化案
### 現実的なデータ収集設定
#### 1. 少量運用(推奨:初期導入・テスト用)
```
対象銘柄: 10銘柄
Global News: 各subreddit 50件/日 × 5 = 250件
Company News: 各銘柄 50件/日 × 10 = 500件
日次取得時間: 約1分
月次バックフィル30日: 約5分
```
#### 2. 中規模運用(推奨:通常運用)
```
対象銘柄: 50銘柄
Global News: 各subreddit 100件/日 × 5 = 500件
Company News: 各銘柄 50件/日 × 50 = 2,500件
日次取得時間: 約2分
月次バックフィル30日: 約21分
```
#### 3. 大規模運用S&P500等
```
対象銘柄: 500銘柄
Global News: 各subreddit 100件/日 × 5 = 500件
Company News: 各銘柄 20件/日 × 500 = 10,000件
日次取得時間: 約7分
月次バックフィル30日: 約3.5時間
```
## レート制限を考慮した実装戦略
### 1. リクエスト間隔の設定
```python
# 安全な設定
SAFE_REQUEST_INTERVAL = 0.75 # 秒80 requests/minute
# アグレッシブな設定(モニタリング必須)
AGGRESSIVE_REQUEST_INTERVAL = 0.6 # 秒100 requests/minute
# 推奨:段階的アプローチ
if total_requests < 100:
interval = 0.6 # 短時間なら高速
elif total_requests < 1000:
interval = 0.75 # 中規模は安全に
else:
interval = 1.0 # 大規模は慎重に
```
### 2. 効率的なデータ取得パターン
#### A. 日次更新(推奨)
```python
# 毎日深夜に前日分のデータを取得
# メリット: API負荷分散、安定運用
# デメリット: リアルタイム性なし
schedule.every().day.at("02:00").do(fetch_yesterday_data)
```
#### B. 複数回更新
```python
# 1日3回更新市場開始前、昼、終了後
# メリット: 準リアルタイム
# デメリット: API使用量3倍
schedule.every().day.at("08:00").do(fetch_recent_data)
schedule.every().day.at("13:00").do(fetch_recent_data)
schedule.every().day.at("17:00").do(fetch_recent_data)
```
#### C. 優先度ベース
```python
# 重要銘柄は頻繁に、その他は日次
HIGH_PRIORITY = ["AAPL", "MSFT", "NVDA", "TSLA"]
fetch_interval = {
"high": 4, # 4時間ごと
"medium": 12, # 12時間ごと
"low": 24 # 24時間ごと
}
```
### 3. エラーハンドリングとレート制限対策
```python
class RedditRateLimiter:
def __init__(self):
self.request_times = deque(maxlen=100)
self.minute_window = 60
def wait_if_needed(self):
now = time.time()
# 直近100リクエストをチェック
if len(self.request_times) == 100:
oldest = self.request_times[0]
if now - oldest < self.minute_window:
sleep_time = self.minute_window - (now - oldest) + 1
time.sleep(sleep_time)
self.request_times.append(now)
```
## 推奨構成
### 初期導入時
1. **対象**: TOP 10銘柄 + Global News
2. **頻度**: 日次更新深夜2時
3. **データ量**: 各50-100件/日
4. **所要時間**: 約1-2分/日
5. **月間API使用量**: 約1,650 calls
### 本番運用時
1. **対象**: 50銘柄 + Global News
2. **頻度**:
- 重要10銘柄: 6時間ごと
- その他40銘柄: 日次
3. **データ量**:
- 重要銘柄: 100件/回
- その他: 50件/日
4. **所要時間**: 約10分/日(分散実行)
5. **月間API使用量**: 約15,000 calls
### スケーラビリティ考慮事項
- 500銘柄以上は複数のClient IDを使用検討
- 時間帯分散(米国市場時間外を活用)
- キャッシュ活用でAPI呼び出し削減
- 増分更新(新規投稿のみ取得)
## まとめ
Reddit APIのレート制限100 QPMを考慮すると、中規模運用50銘柄程度が最もバランスが良い。データ収集量は各投稿50-100件/日が現実的で、これにより:
- 十分な市場センチメント把握が可能
- API制限内で安定運用
- 日次更新が数分で完了
- 月次バックフィルも30分以内
大規模運用500銘柄以上の場合は、優先度設定や時間分散などの工夫が必要。

View File

@ -0,0 +1,260 @@
# Reddit API (praw) 実装要件定義書
## 1. 概要
既存のローカルファイルベースのRedditデータ取得を、Reddit API (praw)を使用したリアルタイム取得に変更し、過去データの収集とキャッシュ機能を実装する。
## 2. 機能要件
### 2.1 データ取得範囲
#### Global News カテゴリ
以下のsubredditから金融・経済関連のグローバルニュースを取得
- r/worldnews - 世界ニュース
- r/news - 一般ニュース
- r/economics - 経済ニュース
- r/finance - 金融ニュース
- r/business - ビジネスニュース
#### Company News カテゴリ
以下のsubredditから企業・株式関連情報を取得
- r/stocks - 株式投資全般
- r/StockMarket - 株式市場動向
- r/wallstreetbets - トレーダーコミュニティ
- r/investing - 投資戦略
- r/SecurityAnalysis - 企業分析
### 2.2 企業関連投稿の検索戦略(推奨)
**ハイブリッドアプローチ**を採用:
1. **直接検索**: Reddit検索APIで企業名/ティッカーを検索API効率的
2. **ストリーム監視**: 新着投稿をリアルタイムで取得し、企業名/ティッカーでフィルタリング
3. **人気投稿スキャン**: 各subredditのtop/hot投稿を定期取得してフィルタリング
```python
# 検索例
def search_company_posts(ticker: str, company_name: str):
# 1. Reddit検索API
search_queries = [
f'"{ticker}"',
f'"{company_name}"',
f'${ticker}', # Cashtag
]
# 2. タイトルと本文でのマッチング
# 3. 重複排除post ID使用
```
### 2.3 データ収集粒度API制限を考慮した推奨設定
**Reddit API制限**
- 認証済みアプリ: 100リクエスト/分QPM
- 1リクエストで最大100アイテム取得可能
- 10分間の平均でレート制限を計算バースト対応
**最適化されたデータ収集設定**
- Global News:
- 各subredditから上位50-100件/日1リクエストで取得可能
- 5 subreddits × 1リクエスト = 5リクエスト/日
- Company News:
- 各企業につき上位50件/日1リクエストで取得
- 人気10銘柄の場合: 10リクエスト/日
- 追加銘柄: バッチ処理で効率化
**バッチ処理戦略**
- 1分あたり最大80リクエストに制限バッファ20%
- 大量銘柄の場合は時間分散:
- 50銘柄 = 約1分で完了
- 500銘柄 = 約7分で完了レート制限考慮
- ソート基準: Hot → Top (24h) → New の優先順位
### 2.4 重複排除
- Reddit post IDを使用してグローバルに重複を排除
- 同一投稿が複数subredditに投稿された場合、最初に取得したものを保持
## 3. CLI インターフェース
### 3.1 過去データ取得(対話形式)
```bash
python -m cli.main reddit fetch-historical
# 対話形式のプロンプト
> Which category? (global_news/company_news/both): both
> Start date (YYYY-MM-DD): 2024-01-01
> End date (YYYY-MM-DD) [default: today]: 2024-03-31
> For company news, select tickers:
1. Popular Tech Stocks (15 tickers)
2. S&P 500 Top 20
3. Global Indices (20 ETFs)
4. All Combined (50+ tickers)
5. Quick Test (5 tickers)
6. Custom (enter your own)
> Select option (1-6): 1
> Confirm fetch? This may take several minutes. (y/n): y
```
### 3.2 日次更新
```bash
# 昨日のデータを取得
python -m cli.main reddit update --date yesterday
# 特定日のデータを取得
python -m cli.main reddit update --date 2024-03-15
# 自動実行用(エラー時はログ記録)
python -m cli.main reddit update --auto
```
### 3.3 データ検証
```bash
# キャッシュ状況確認
python -m cli.main reddit status
# 特定期間のデータ完全性チェック
python -m cli.main reddit verify --start 2024-01-01 --end 2024-03-31
```
## 4. キャッシュ設計
### 4.1 ディレクトリ構造
```
/Users/y_sato/Library/Mobile Documents/com~apple~CloudDocs/curosur/API疎通確認ずみ/APIテスト完了済み/TradingAgents/Datasource/
└── reddit_data/
├── global_news/
│ ├── r_worldnews_2024-01-01.jsonl
│ ├── r_economics_2024-01-01.jsonl
│ └── ...
├── company_news/
│ ├── AAPL_2024-01-01.jsonl
│ ├── MSFT_2024-01-01.jsonl
│ └── ...
└── metadata/
├── fetch_history.json # 取得履歴
├── post_ids.db # 重複チェック用
└── ticker_presets.json # デフォルトTickerリスト
```
### 4.2 JSONL形式既存互換
```json
{
"id": "1a2b3c4",
"title": "Apple announces new product",
"selftext": "Content here...",
"url": "https://reddit.com/...",
"ups": 1234,
"created_utc": 1704067200,
"subreddit": "r/stocks",
"author": "username",
"num_comments": 56,
"ticker": "AAPL" // 企業投稿の場合
}
```
## 5. 実装アーキテクチャ
### 5.1 モジュール構成
```python
tradingagents/dataflows/
├── reddit_praw_client.py # prawクライアントラッパー
├── reddit_fetcher.py # データ取得ロジック
├── reddit_cache_manager.py # キャッシュ管理
└── reddit_utils.py # 既存互換性レイヤー
```
### 5.2 エラーハンドリング
- API接続エラー: 上位層に例外を伝播
- Rate limit エラー: 自動リトライ with exponential backoff
- 認証エラー: 即座に例外を発生させる
### 5.3 設定管理
```python
# tradingagents/config.py に追加
REDDIT_CONFIG = {
"user_agent": "TradingAgents/1.0",
"rate_limit_pause": 1.0, # 秒
"max_retries": 3,
"data_base_dir": "/Users/y_sato/Library/Mobile Documents/com~apple~CloudDocs/curosur/API疎通確認ずみ/APIテスト完了済み/TradingAgents/Datasource",
"cache_dir": "reddit_data",
"global_news_subreddits": [
"worldnews", "news", "economics", "finance", "business"
],
"company_news_subreddits": [
"stocks", "StockMarket", "wallstreetbets", "investing", "SecurityAnalysis"
],
"daily_post_limits": {
"global_news": 100,
"company_news": 50
},
"default_ticker_presets": {
"tech": ["AAPL", "MSFT", "NVDA", "GOOGL", "META", "AMZN", "TSLA", "AMD", "INTC", "NFLX", "AVGO", "ORCL", "ADBE", "CRM", "QCOM"],
"sp500": ["AAPL", "MSFT", "NVDA", "AMZN", "META", "GOOGL", "GOOG", "BRK.B", "LLY", "AVGO", "JPM", "TSLA", "V", "UNH", "XOM", "MA", "JNJ", "WMT", "PG", "HD"],
"indices": ["SPY", "QQQ", "DIA", "IWM", "EWJ", "DXJ", "EWG", "EWQ", "EWU", "FEZ", "EEM", "FXI", "INDA", "EWZ", "XLK", "XLF", "XLE", "XLV"],
"quick": ["AAPL", "MSFT", "NVDA", "TSLA", "SPY"]
}
}
```
## 6. 自動実行設定
### 6.1 Cron設定例
```bash
# 毎日午前6時に前日データを取得
0 6 * * * cd /path/to/TradingAgents && python -m cli.main reddit update --auto >> logs/reddit_update.log 2>&1
# 毎週月曜日に先週のデータ完全性をチェック
0 9 * * 1 cd /path/to/TradingAgents && python -m cli.main reddit verify --days 7 >> logs/reddit_verify.log 2>&1
```
### 6.2 エラー通知
- ログファイルにエラー記録
- 重要なエラー(認証失敗等)はメール/Slack通知オプション
## 7. 移行計画
### Phase 1: 基盤実装
1. praw クライアントラッパー実装
2. 基本的なデータ取得機能
3. キャッシュ管理機能
### Phase 2: CLI実装
1. 対話形式の過去データ取得
2. 日次更新コマンド
3. ステータス確認機能
### Phase 3: 自動化
1. エラーハンドリング強化
2. 自動実行対応
3. 既存システムとの統合テスト
## 8. 性能要件
### API制限に基づく現実的な性能目標
- **レート制限**: 100 requests/minute認証済み
- **バッチサイズ**: 100 items/request
- **安全マージン**: 80 requests/minute で運用
### データ取得時間の見積もり
**1日分のデータ**:
- Global News: 5 subreddits = 5リクエスト
- Company News (50銘柄): 50リクエスト
- 合計: 55リクエスト = **約1分で完了**
**1ヶ月分30日の過去データ**:
- 各日付ごとに処理が必要
- Global News: 5 × 30 = 150リクエスト
- Company News (50銘柄): 50 × 30 = 1,500リクエスト
- 合計: 1,650リクエスト = **約21分で完了**
**1年分365日の過去データ**:
- Global News: 5 × 365 = 1,825リクエスト
- Company News (50銘柄): 50 × 365 = 18,250リクエスト
- 合計: 20,075リクエスト = **約4.2時間で完了**
### 最適化のポイント
- 並列処理は避ける(レート制限違反のリスク)
- 2秒間隔でリクエストを送信30 requests/minuteで安定運用
- エラー時の再試行は exponential backoff を使用
## 9. セキュリティ
- Reddit認証情報は環境変数で管理
- キャッシュデータへのアクセス制限
- ログに認証情報を含めない

View File

@ -0,0 +1,250 @@
# Reddit データ取得期間の対話形式ヒアリング設計
## 対話フロー例
### 1. 基本的な対話フロー
```python
def interactive_date_range_prompt():
"""
ユーザーと対話してデータ取得期間を決定する
"""
print("Reddit Historical Data Fetcher")
print("="*40)
# Step 1: 取得目的の確認
print("\nWhat is the purpose of this data fetch?")
print("1. Backtesting preparation")
print("2. Historical analysis")
print("3. Initial data setup")
print("4. Gap filling (missing dates)")
purpose = input("Select purpose (1-4): ")
# Step 2: 期間の提案
if purpose == "1": # Backtesting
print("\nFor backtesting, we recommend:")
print("- Short-term: Last 3 months")
print("- Medium-term: Last 6 months")
print("- Long-term: Last 1 year")
elif purpose == "2": # Historical analysis
print("\nFor historical analysis:")
print("- Recent events: Last 1 month")
print("- Quarterly analysis: Last 3 months")
print("- Annual trends: Last 1 year")
# Step 3: 期間選択
print("\nHow would you like to specify the date range?")
print("1. Use predefined period (1 week/1 month/3 months/6 months/1 year)")
print("2. Specify exact dates")
print("3. Relative dates (e.g., 'last 30 days')")
choice = input("Select option (1-3): ")
if choice == "1":
return handle_predefined_period()
elif choice == "2":
return handle_exact_dates()
else:
return handle_relative_dates()
```
### 2. Reddit API制限を考慮した警告表示
```python
def validate_date_range(start_date, end_date):
"""
選択された期間の妥当性を確認し、必要に応じて警告
"""
days_diff = (end_date - start_date).days
# 警告レベルの判定
if days_diff > 365:
print(f"\n⚠ WARNING: Large date range ({days_diff} days)")
print("This will result in:")
print(f"- Approximately {days_diff * 5} API calls")
print(f"- Estimated time: {days_diff * 0.5:.1f} minutes")
print("- Reddit API may not return complete historical data beyond 1000 posts per subreddit")
if not confirm("Do you want to continue with this large range?"):
return False
elif days_diff > 180:
print(f"\n📊 Date range: {days_diff} days")
print(f"- Estimated time: {days_diff * 0.5:.1f} minutes")
print("- This is a reasonable range for comprehensive analysis")
return True
```
### 3. 段階的な期間設定(推奨実装)
```python
def smart_date_range_selector():
"""
ユーザーの経験レベルに応じた期間設定
"""
print("\nLet's determine the best date range for your needs.")
# Step 1: データ利用頻度の確認
print("\nHow often will you run backtests?")
print("1. Daily (active trading)")
print("2. Weekly (regular monitoring)")
print("3. Monthly (periodic review)")
print("4. One-time analysis")
frequency = input("Select frequency (1-4): ")
# Step 2: 推奨期間の提示
recommendations = {
"1": {
"initial": "1 month",
"update": "daily",
"reason": "Recent data is most relevant for active trading"
},
"2": {
"initial": "3 months",
"update": "weekly",
"reason": "Balanced between data volume and relevance"
},
"3": {
"initial": "6 months",
"update": "monthly",
"reason": "Good for trend analysis and strategy validation"
},
"4": {
"initial": "1 year",
"update": "as needed",
"reason": "Comprehensive historical data for research"
}
}
rec = recommendations[frequency]
print(f"\n💡 Recommendation based on your usage:")
print(f"- Initial fetch: {rec['initial']} of historical data")
print(f"- Update schedule: {rec['update']}")
print(f"- Reason: {rec['reason']}")
# Step 3: カスタマイズオプション
print("\nWould you like to:")
print("1. Accept recommendation")
print("2. Modify the period")
print("3. See data availability preview")
return handle_user_choice()
```
### 4. データ可用性のプレビュー
```python
def preview_data_availability(start_date, end_date, tickers=None):
"""
指定期間のデータ可用性をプレビュー表示
"""
print("\n📈 Data Availability Preview")
print("="*50)
# Reddit API制限の説明
print("\nReddit API Limitations:")
print("- Posts older than 6 months may be incomplete")
print("- Maximum 1000 posts per subreddit search")
print("- Rate limit: 60 requests per minute")
# 期間別の推定データ量
days = (end_date - start_date).days
print(f"\nEstimated data volume for {days} days:")
print(f"- Global news: ~{days * 100} posts from 5 subreddits")
if tickers:
print(f"- Company news for {len(tickers)} tickers:")
for ticker in tickers[:5]: # 最初の5つを表示
print(f" - {ticker}: ~{days * 20} posts")
if len(tickers) > 5:
print(f" - ... and {len(tickers)-5} more tickers")
# 取得時間の見積もり
total_requests = estimate_api_requests(days, tickers)
estimated_time = (total_requests / 60) * 1.2 # 20%のバッファ
print(f"\n⏱ Estimated fetch time: {estimated_time:.1f} minutes")
print(f"📊 Total API requests: ~{total_requests}")
return True
```
### 5. 実装例CLI統合
```python
# cli/commands/reddit.py
@click.command()
@click.option('--interactive/--no-interactive', default=True,
help='Use interactive mode for date selection')
def fetch_historical(interactive):
"""Fetch historical Reddit data with smart date selection"""
if interactive:
# 対話形式で期間を決定
date_range = smart_date_range_selector()
# プレビュー表示
if preview_data_availability(date_range.start, date_range.end):
if click.confirm("Proceed with data fetch?"):
fetch_reddit_data(date_range)
else:
# 非対話形式(自動実行用)
fetch_reddit_data(get_default_date_range())
```
## 使用例
### 初回セットアップ時
```
$ python -m cli.main reddit fetch-historical
Reddit Historical Data Fetcher
========================================
What is the purpose of this data fetch?
1. Backtesting preparation
2. Historical analysis
3. Initial data setup
4. Gap filling (missing dates)
Select purpose (1-4): 3
For initial setup, we recommend starting with recent data.
How much historical data do you need?
1. Last 1 month (recommended for testing)
2. Last 3 months (good for short-term analysis)
3. Last 6 months (balanced approach)
4. Last 1 year (comprehensive but time-consuming)
5. Custom range
Select option (1-5): 2
📊 You selected: Last 3 months (2024-01-01 to 2024-03-31)
Which tickers would you like to track?
1. Top 10 most discussed (TSLA, AAPL, GME, AMC, NVDA, ...)
2. S&P 500 leaders
3. Custom list
4. All available (not recommended)
Select option (1-4): 1
📈 Data Availability Preview
==================================================
Period: 2024-01-01 to 2024-03-31 (90 days)
Tickers: TSLA, AAPL, GME, AMC, NVDA, MSFT, AMZN, META, GOOGL, SPY
Estimated data volume:
- Global news: ~9,000 posts from 5 subreddits
- Company news: ~18,000 posts for 10 tickers
⏱️ Estimated fetch time: 15.0 minutes
📊 Total API requests: ~750
Proceed with data fetch? [y/N]: y
```
この設計により、ユーザーは自分のニーズに合った期間を選択でき、同時にAPI制限やデータ量についても理解した上で実行できます。

View File

@ -0,0 +1,109 @@
# デフォルトTickerリスト定義
## 1. 人気テック銘柄
```python
POPULAR_TECH_TICKERS = [
"AAPL", # Apple
"MSFT", # Microsoft
"NVDA", # NVIDIA
"GOOGL", # Alphabet Class A
"META", # Meta Platforms
"AMZN", # Amazon
"TSLA", # Tesla
"AMD", # Advanced Micro Devices
"INTC", # Intel
"NFLX", # Netflix
"AVGO", # Broadcom
"ORCL", # Oracle
"ADBE", # Adobe
"CRM", # Salesforce
"QCOM" # Qualcomm
]
```
## 2. S&P500上位銘柄
```python
SP500_TOP_TICKERS = [
"AAPL", # Apple
"MSFT", # Microsoft
"NVDA", # NVIDIA
"AMZN", # Amazon
"META", # Meta Platforms
"GOOGL", # Alphabet Class A
"GOOG", # Alphabet Class C
"BRK.B", # Berkshire Hathaway Class B
"LLY", # Eli Lilly
"AVGO", # Broadcom
"JPM", # JPMorgan Chase
"TSLA", # Tesla
"V", # Visa
"UNH", # UnitedHealth
"XOM", # Exxon Mobil
"MA", # Mastercard
"JNJ", # Johnson & Johnson
"WMT", # Walmart
"PG", # Procter & Gamble
"HD" # Home Depot
]
```
## 3. 世界の主要株価指数
```python
GLOBAL_INDICES = [
# 米国
"SPY", # S&P 500 ETF
"QQQ", # NASDAQ 100 ETF
"DIA", # Dow Jones ETF
"IWM", # Russell 2000 ETF
# 日本
"EWJ", # iShares MSCI Japan ETF
"NKY", # Nikkei 225 (symbol varies by platform)
"DXJ", # WisdomTree Japan Hedged Equity
"^N225", # Nikkei 225 Index (Yahoo Finance)
"NIKKEI", # Nikkei 225 (alternative symbol)
# ヨーロッパ
"EWG", # iShares MSCI Germany ETF
"EWQ", # iShares MSCI France ETF
"EWU", # iShares MSCI UK ETF
"FEZ", # SPDR EURO STOXX 50 ETF
# 新興国
"EEM", # iShares MSCI Emerging Markets ETF
"FXI", # iShares China Large-Cap ETF
"INDA", # iShares MSCI India ETF
"EWZ", # iShares MSCI Brazil ETF
# セクター別
"XLK", # Technology Select Sector SPDR
"XLF", # Financial Select Sector SPDR
"XLE", # Energy Select Sector SPDR
"XLV" # Health Care Select Sector SPDR
]
```
## 4. 統合デフォルトリスト
```python
DEFAULT_TICKER_PRESETS = {
"tech": POPULAR_TECH_TICKERS,
"sp500": SP500_TOP_TICKERS,
"indices": GLOBAL_INDICES,
"all": list(set(POPULAR_TECH_TICKERS + SP500_TOP_TICKERS + GLOBAL_INDICES)),
"quick": ["AAPL", "MSFT", "NVDA", "TSLA", "SPY"], # クイックテスト用
}
```
## 使用例
```bash
# CLIでの選択
> Select ticker preset or enter custom:
1. Popular Tech Stocks (15 tickers)
2. S&P 500 Top 20
3. Global Indices (20 ETFs)
4. All Combined (50+ unique tickers)
5. Quick Test (5 tickers)
6. Custom (enter your own)
Select option (1-6):
```

View File

@ -24,3 +24,4 @@ rich
questionary
langchain_anthropic
langchain-google-genai
python-dotenv

60
tradingagents/config.py Normal file
View File

@ -0,0 +1,60 @@
"""
Configuration management for TradingAgents.
Loads configuration from environment variables and .env file.
"""
import os
from pathlib import Path
from dotenv import load_dotenv
# Load .env file from project root
project_root = Path(__file__).parent.parent
env_path = project_root / '.env'
load_dotenv(env_path)
def get_config():
"""Get configuration with environment variable overrides."""
config = {
# Project directories
"project_dir": str(project_root / "tradingagents"),
"results_dir": os.getenv("TRADINGAGENTS_RESULTS_DIR", "./results"),
"data_dir": os.getenv("TRADINGAGENTS_DATA_DIR", "/Users/yluo/Documents/Code/ScAI/FR1-data"),
"data_cache_dir": str(project_root / "tradingagents" / "dataflows" / "data_cache"),
# LLM settings
"llm_provider": os.getenv("LLM_PROVIDER", "openai"),
"deep_think_llm": os.getenv("DEEP_THINK_LLM", "o4-mini"),
"quick_think_llm": os.getenv("QUICK_THINK_LLM", "gpt-4o-mini"),
"backend_url": os.getenv("BACKEND_URL", "https://api.openai.com/v1"),
# Debate and discussion settings
"max_debate_rounds": int(os.getenv("MAX_DEBATE_ROUNDS", "1")),
"max_risk_discuss_rounds": int(os.getenv("MAX_RISK_DISCUSS_ROUNDS", "1")),
"max_recur_limit": int(os.getenv("MAX_RECUR_LIMIT", "100")),
# Tool settings
"online_tools": os.getenv("ONLINE_TOOLS", "true").lower() == "true",
# API Keys (loaded from environment)
"openai_api_key": os.getenv("OPENAI_API_KEY"),
"finnhub_api_key": os.getenv("FINNHUB_API_KEY"),
"anthropic_api_key": os.getenv("ANTHROPIC_API_KEY"),
"google_api_key": os.getenv("GOOGLE_API_KEY"),
"reddit_client_id": os.getenv("REDDIT_CLIENT_ID"),
"reddit_client_secret": os.getenv("REDDIT_CLIENT_SECRET"),
}
# Validate required API keys based on provider
if config["llm_provider"] == "openai" and not config["openai_api_key"]:
raise ValueError("OPENAI_API_KEY is required when using OpenAI provider")
elif config["llm_provider"] == "anthropic" and not config["anthropic_api_key"]:
raise ValueError("ANTHROPIC_API_KEY is required when using Anthropic provider")
elif config["llm_provider"] == "google" and not config["google_api_key"]:
raise ValueError("GOOGLE_API_KEY is required when using Google provider")
if not config["finnhub_api_key"]:
print("Warning: FINNHUB_API_KEY not set. Some financial data features may be limited.")
return config
# Export default configuration
DEFAULT_CONFIG = get_config()

View File

@ -4,6 +4,11 @@ from bs4 import BeautifulSoup
from datetime import datetime
import time
import random
import logging
from urllib.parse import quote_plus
logger = logging.getLogger(__name__)
from tenacity import (
retry,
stop_after_attempt,
@ -14,20 +19,24 @@ from tenacity import (
def is_rate_limited(response):
"""Check if the response indicates rate limiting (status code 429)"""
return response.status_code == 429
"""Check if the response indicates we should back off (rate-limited or temporarily unavailable)."""
return response.status_code in (429, 403, 503)
def _add_jitter(retry_state):
# Add small random jitter before each retry to avoid detection patterns
time.sleep(random.uniform(1, 3))
@retry(
retry=(retry_if_result(is_rate_limited)),
wait=wait_exponential(multiplier=1, min=4, max=60),
before_sleep=_add_jitter,
stop=stop_after_attempt(5),
)
def make_request(url, headers):
"""Make a request with retry logic for rate limiting"""
# Random delay before each request to avoid detection
time.sleep(random.uniform(2, 6))
response = requests.get(url, headers=headers)
# The retry decorator already applies exponential backoff with jitter
response = requests.get(url, headers=headers, timeout=(5, 20))
return response
@ -57,8 +66,9 @@ def getNewsData(query, start_date, end_date):
page = 0
while True:
offset = page * 10
encoded_query = quote_plus(query)
url = (
f"https://www.google.com/search?q={query}"
f"https://www.google.com/search?q={encoded_query}"
f"&tbs=cdr:1,cd_min:{start_date},cd_max:{end_date}"
f"&tbm=nws&start={offset}"
)
@ -73,25 +83,31 @@ def getNewsData(query, start_date, end_date):
for el in results_on_page:
try:
link = el.find("a")["href"]
title = el.select_one("div.MBeuO").get_text()
snippet = el.select_one(".GI74Re").get_text()
date = el.select_one(".LfVVr").get_text()
source = el.select_one(".NUnG9d span").get_text()
link_tag = el.find("a")
title_el = el.select_one("div.MBeuO")
if not link_tag or not title_el:
# Skip if required elements are missing
continue
link = link_tag.get("href")
title = title_el.get_text(strip=True)
snippet_el = el.select_one(".GI74Re")
date_el = el.select_one(".LfVVr")
source_el = el.select_one(".NUnG9d span")
news_results.append(
{
"link": link,
"title": title,
"snippet": snippet,
"date": date,
"source": source,
"snippet": snippet_el.get_text(strip=True) if snippet_el else "",
"date": date_el.get_text(strip=True) if date_el else "",
"source": source_el.get_text(strip=True) if source_el else "",
}
)
except Exception as e:
print(f"Error processing result: {e}")
logger.warning("Error processing result: %s", e)
# If one of the fields is not found, skip this result
continue
# Update the progress bar with the current count of results scraped
# Check for the "Next" link (pagination)
@ -102,7 +118,7 @@ def getNewsData(query, start_date, end_date):
page += 1
except Exception as e:
print(f"Failed after multiple retries: {e}")
logger.error("Failed after multiple retries: %s", e)
break
return news_results

View File

@ -287,7 +287,7 @@ def get_google_news(
curr_date: Annotated[str, "Curr date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "how many days to look back"],
) -> str:
query = query.replace(" ", "+")
# URL encoding is handled inside googlenews_utils.getNewsData
start_date = datetime.strptime(curr_date, "%Y-%m-%d")
before = start_date - relativedelta(days=look_back_days)

View File

@ -1,22 +1,5 @@
import os
# Legacy compatibility - import from new config module
from .config import DEFAULT_CONFIG
DEFAULT_CONFIG = {
"project_dir": os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
"results_dir": os.getenv("TRADINGAGENTS_RESULTS_DIR", "./results"),
"data_dir": "/Users/yluo/Documents/Code/ScAI/FR1-data",
"data_cache_dir": os.path.join(
os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
"dataflows/data_cache",
),
# LLM settings
"llm_provider": "openai",
"deep_think_llm": "o4-mini",
"quick_think_llm": "gpt-4o-mini",
"backend_url": "https://api.openai.com/v1",
# Debate and discussion settings
"max_debate_rounds": 1,
"max_risk_discuss_rounds": 1,
"max_recur_limit": 100,
# Tool settings
"online_tools": True,
}
# Re-export for backward compatibility
__all__ = ["DEFAULT_CONFIG"]