Merge branch 'korean' into dev/web

This commit is contained in:
kimheesu 2025-07-01 16:38:31 +09:00
commit dcaa02f26c
32 changed files with 8331 additions and 3111 deletions

120
.gitignore vendored
View File

@ -1,123 +1,3 @@
# 환경 변수 파일
.env
web/backend/.env
*.env
env_local.txt
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Django
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
/staticfiles/
/media/
# Virtual Environment
venv/
env/
ENV/
env.bak/
venv.bak/
# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Node.js (React)
web/frontend/node_modules/
web/frontend/build/
web/frontend/dist/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Docker
docker-compose.override.yml
# Logs
logs/
*.log
# Coverage reports
htmlcov/
.coverage
.coverage.*
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Trading specific
trading_data/
analysis_results/
temp_files/
env/
__pycache__/
.DS_Store

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.10

302
README.md
View File

@ -80,7 +80,7 @@ Our framework decomposes complex trading tasks into specialized roles. This ensu
- Composes reports from the analysts and researchers to make informed trading decisions. It determines the timing and magnitude of trades based on comprehensive market insights.
<p align="center">
<img src="assets/risk.png" width="70%" style="display: inline-block; margin: 0 2%;">
<img src="assets/trader.png" width="70%" style="display: inline-block; margin: 0 2%;">
</p>
### Risk Management and Portfolio Manager
@ -88,7 +88,7 @@ Our framework decomposes complex trading tasks into specialized roles. This ensu
- The Portfolio Manager approves/rejects the transaction proposal. If approved, the order will be sent to the simulated exchange and executed.
<p align="center">
<img src="assets/trader.png" width="70%" style="display: inline-block; margin: 0 2%;">
<img src="assets/risk.png" width="70%" style="display: inline-block; margin: 0 2%;">
</p>
## Installation and CLI
@ -119,9 +119,10 @@ You will also need the FinnHub API for financial data. All of our code is implem
export FINNHUB_API_KEY=$YOUR_FINNHUB_API_KEY
```
You will need the OpenAI API for all the agents.
You will need the OpenAI API or GEMINI API for all the agents.
```bash
export OPENAI_API_KEY=$YOUR_OPENAI_API_KEY
export GEMINI_API_KEY=$YOUR_GEMINI_API_KEY
```
### CLI Usage
@ -211,298 +212,3 @@ Please reference our work if you find *TradingAgents* provides you with some hel
url={https://arxiv.org/abs/2412.20138},
}
```
# TradingAgents Web Application
CLI 기능을 웹에서 사용할 수 있는 React + Django 웹 애플리케이션입니다.
## 주요 기능
1. **사용자 인증**
- JWT 기반 로그인/회원가입
- OpenAI API 키 관리 (암호화 저장)
- 개발자 기본 키 fallback
2. **거래 분석**
- CLI의 모든 분석 기능을 웹에서 사용
- 실시간 분석 진행 상황 (WebSocket)
- 분석 기록 관리
3. **사용자 경험**
- 현대적인 React UI (Ant Design)
- 반응형 디자인
- 실시간 업데이트
## 기술 스택
### 백엔드
- **Django 4.2** - 웹 프레임워크
- **Django REST Framework** - API 개발
- **Django Channels** - WebSocket 지원
- **MySQL 8.0** - 데이터베이스 (Docker)
- **Redis 7** - WebSocket 메시지 브로커 (Docker)
- **JWT** - 인증
### 프론트엔드
- **React 18** - UI 라이브러리
- **Ant Design** - UI 컴포넌트
- **Styled Components** - 스타일링
- **Axios** - HTTP 클라이언트
- **WebSocket** - 실시간 통신
## 설치 및 실행
### 1. 환경 설정
```bash
# 가상환경 생성 및 활성화
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Python 의존성 설치
pip install -r requirements.txt
# Node.js 의존성 설치
cd web/frontend
npm install
cd ../..
```
### 2. 데이터베이스 및 Redis 설정 (Docker)
Docker와 Docker Compose를 이용해 MySQL과 Redis를 실행합니다.
```bash
# Docker 및 Docker Compose 설치 확인
docker --version
docker-compose --version
# 편의 스크립트 사용 (권장)
chmod +x scripts/docker-commands.sh
./scripts/docker-commands.sh start
# 또는 직접 Docker Compose 명령 사용
docker-compose up -d mysql redis
# phpMyAdmin도 함께 시작 (데이터베이스 관리용)
./scripts/docker-commands.sh start-all
# 컨테이너 상태 확인
./scripts/docker-commands.sh status
```
### 3. 환경 변수 설정
`web/backend/.env` 파일을 생성합니다. `env_example.txt`를 참고하여 설정하세요:
```bash
# 예시 파일을 복사하여 시작
cp web/backend/env_example.txt web/backend/.env
# .env 파일을 편집하여 실제 값들로 변경
nano web/backend/.env # 또는 다른 텍스트 에디터 사용
```
주요 설정값들:
```env
# Django 설정
SECRET_KEY=your-secret-key-here-change-this-to-a-random-string
DEBUG=True
ALLOWED_HOSTS=localhost,127.0.0.1
# MySQL 데이터베이스 설정 (Docker)
DB_NAME=tradingagents_web
DB_USER=root
DB_PASSWORD=your-mysql-password-here
DB_HOST=127.0.0.1
DB_PORT=3306
# Redis 설정 (Docker)
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
# OpenAI API 키 (개발자 기본 키)
OPENAI_API_KEY=your-openai-api-key-here
```
### 4. 데이터베이스 마이그레이션
```bash
cd web/backend
python manage.py makemigrations
python manage.py migrate
python manage.py createsuperuser # 관리자 계정 생성
```
### 5. 개발 서버 실행
**터미널 1 - Docker 컨테이너 (MySQL + Redis):**
```bash
# 백그라운드에서 실행
docker-compose up -d mysql redis
# 또는 포그라운드에서 로그 확인
docker-compose up mysql redis
```
**터미널 2 - Django 백엔드:**
```bash
cd web/backend
python manage.py runserver
```
**터미널 3 - React 프론트엔드:**
```bash
cd web/frontend
npm start
```
## 접속 정보
- **프론트엔드**: http://localhost:3000
- **백엔드 API**: http://localhost:8000
- **Django Admin**: http://localhost:8000/admin
- **phpMyAdmin** (선택사항): http://localhost:8080
## API 엔드포인트
### 인증
- `POST /api/auth/register/` - 회원가입
- `POST /api/auth/login/` - 로그인
- `GET /api/auth/user/` - 사용자 정보
- `PUT /api/auth/profile/` - 프로필 수정
- `POST /api/auth/check-api-key/` - API 키 검증
### 거래 분석
- `GET /api/trading/config/` - 분석 설정 정보
- `POST /api/trading/start/` - 분석 시작
- `GET /api/trading/status/{id}/` - 분석 상태 조회
- `GET /api/trading/history/` - 분석 기록
- `GET /api/trading/report/{id}/` - 분석 보고서
### WebSocket
- `ws://localhost:8000/ws/trading-analysis/` - 실시간 분석 업데이트
## OpenAI API 키 관리
1. **사용자 개별 키**: 사용자가 프로필에서 설정한 개인 OpenAI API 키
2. **개발자 기본 키**: `.env` 파일의 `OPENAI_API_KEY` (사용자 키가 없을 때 사용)
3. **보안**: 사용자 키는 암호화되어 데이터베이스에 저장
## 프로젝트 구조
```
├── cli/ # 기존 CLI 코드
├── web/
│ ├── backend/ # Django 백엔드
│ │ ├── tradingagents_web/ # 프로젝트 설정
│ │ └── apps/ # Django 앱들
│ │ ├── authentication/ # 사용자 인증
│ │ ├── trading_api/ # 거래 분석 API
│ │ └── websocket/ # WebSocket 처리
│ └── frontend/ # React 프론트엔드
│ ├── public/
│ └── src/
│ ├── components/ # 재사용 컴포넌트
│ ├── contexts/ # React Context
│ ├── pages/ # 페이지 컴포넌트
│ ├── services/ # API 서비스
│ └── styles/ # 스타일 관련
└── requirements.txt # Python 의존성
```
## 개발 가이드
### 새로운 분석 기능 추가
1. `apps/trading_api/services.py`에 새로운 서비스 추가
2. `apps/trading_api/views.py`에 새로운 뷰 추가
3. `apps/trading_api/urls.py`에 URL 패턴 추가
4. 프론트엔드에서 해당 API 호출
### 새로운 페이지 추가
1. `src/pages/` 디렉토리에 새 페이지 컴포넌트 생성
2. `src/App.js`에 라우트 추가
3. 필요한 경우 레이아웃의 메뉴에 추가
## 배포
### Docker Compose (권장)
```bash
# 모든 서비스를 한 번에 시작 (개발 환경)
docker-compose up -d
# 특정 서비스만 시작
docker-compose up -d mysql redis
# 프로덕션 환경에서는 별도의 docker-compose.prod.yml 사용 권장
docker-compose -f docker-compose.prod.yml up -d
```
### 수동 배포
1. **프론트엔드 빌드**:
```bash
cd web/frontend
npm run build
```
2. **Django 정적 파일 수집**:
```bash
cd web/backend
python manage.py collectstatic
```
3. **프로덕션 서버 설정** (Nginx + Gunicorn + Daphne)
## 문제 해결
### 일반적인 문제
1. **Docker 컨테이너 관련**
```bash
# 컨테이너 상태 확인
docker-compose ps
# 컨테이너 로그 확인
docker-compose logs mysql
docker-compose logs redis
# 컨테이너 재시작
docker-compose restart mysql redis
```
2. **WebSocket 연결 실패**
- Redis 컨테이너가 실행 중인지 확인: `docker-compose ps`
- 방화벽 설정 확인
3. **API 키 관련 오류**
- `.env` 파일의 `OPENAI_API_KEY` 확인
- 사용자 프로필에서 API 키 재설정
4. **데이터베이스 연결 오류**
- MySQL 컨테이너 상태 확인: `docker-compose logs mysql`
- `.env` 파일의 데이터베이스 연결 정보 확인
- 컨테이너 포트 충돌 확인: `docker port tradingagents_mysql`
5. **MySQL 컨테이너 초기화 문제**
```bash
# 볼륨 삭제 후 재시작 (데이터 손실 주의!)
docker-compose down -v
docker-compose up -d mysql redis
```
## 라이선스
이 프로젝트는 기존 TradingAgents 프로젝트의 라이선스를 따릅니다.
## 기여
1. Fork the Project
2. Create your Feature Branch (`git checkout -b feature/AmazingFeature`)
3. Commit your Changes (`git commit -m 'Add some AmazingFeature'`)
4. Push to the Branch (`git push origin feature/AmazingFeature`)
5. Open a Pull Request

File diff suppressed because it is too large Load Diff

View File

@ -1,195 +1,277 @@
import questionary
from typing import List, Optional, Tuple, Dict
from cli.models import AnalystType
ANALYST_ORDER = [
("Market Analyst", AnalystType.MARKET),
("Social Media Analyst", AnalystType.SOCIAL),
("News Analyst", AnalystType.NEWS),
("Fundamentals Analyst", AnalystType.FUNDAMENTALS),
]
def get_ticker() -> str:
"""Prompt the user to enter a ticker symbol."""
ticker = questionary.text(
"Enter the ticker symbol to analyze:",
validate=lambda x: len(x.strip()) > 0 or "Please enter a valid ticker symbol.",
style=questionary.Style(
[
("text", "fg:green"),
("highlighted", "noinherit"),
]
),
).ask()
if not ticker:
console.print("\n[red]No ticker symbol provided. Exiting...[/red]")
exit(1)
return ticker.strip().upper()
def get_analysis_date() -> str:
"""Prompt the user to enter a date in YYYY-MM-DD format."""
import re
from datetime import datetime
def validate_date(date_str: str) -> bool:
if not re.match(r"^\d{4}-\d{2}-\d{2}$", date_str):
return False
try:
datetime.strptime(date_str, "%Y-%m-%d")
return True
except ValueError:
return False
date = questionary.text(
"Enter the analysis date (YYYY-MM-DD):",
validate=lambda x: validate_date(x.strip())
or "Please enter a valid date in YYYY-MM-DD format.",
style=questionary.Style(
[
("text", "fg:green"),
("highlighted", "noinherit"),
]
),
).ask()
if not date:
console.print("\n[red]No date provided. Exiting...[/red]")
exit(1)
return date.strip()
def select_analysts() -> List[AnalystType]:
"""Select analysts using an interactive checkbox."""
choices = questionary.checkbox(
"Select Your [Analysts Team]:",
choices=[
questionary.Choice(display, value=value) for display, value in ANALYST_ORDER
],
instruction="\n- Press Space to select/unselect analysts\n- Press 'a' to select/unselect all\n- Press Enter when done",
validate=lambda x: len(x) > 0 or "You must select at least one analyst.",
style=questionary.Style(
[
("checkbox-selected", "fg:green"),
("selected", "fg:green noinherit"),
("highlighted", "noinherit"),
("pointer", "noinherit"),
]
),
).ask()
if not choices:
console.print("\n[red]No analysts selected. Exiting...[/red]")
exit(1)
return choices
def select_research_depth() -> int:
"""Select research depth using an interactive selection."""
# Define research depth options with their corresponding values
DEPTH_OPTIONS = [
("Shallow - Quick research, few debate and strategy discussion rounds", 1),
("Medium - Middle ground, moderate debate rounds and strategy discussion", 3),
("Deep - Comprehensive research, in depth debate and strategy discussion", 5),
]
choice = questionary.select(
"Select Your [Research Depth]:",
choices=[
questionary.Choice(display, value=value) for display, value in DEPTH_OPTIONS
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:yellow noinherit"),
("highlighted", "fg:yellow noinherit"),
("pointer", "fg:yellow noinherit"),
]
),
).ask()
if choice is None:
console.print("\n[red]No research depth selected. Exiting...[/red]")
exit(1)
return choice
def select_shallow_thinking_agent() -> str:
"""Select shallow thinking llm engine using an interactive selection."""
# Define shallow thinking llm engine options with their corresponding model names
SHALLOW_AGENT_OPTIONS = [
("GPT-4o-mini - Fast and efficient for quick tasks", "gpt-4o-mini"),
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
]
choice = questionary.select(
"Select Your [Quick-Thinking LLM Engine]:",
choices=[
questionary.Choice(display, value=value)
for display, value in SHALLOW_AGENT_OPTIONS
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:magenta noinherit"),
("highlighted", "fg:magenta noinherit"),
("pointer", "fg:magenta noinherit"),
]
),
).ask()
if choice is None:
console.print(
"\n[red]No shallow thinking llm engine selected. Exiting...[/red]"
)
exit(1)
return choice
def select_deep_thinking_agent() -> str:
"""Select deep thinking llm engine using an interactive selection."""
# Define deep thinking llm engine options with their corresponding model names
DEEP_AGENT_OPTIONS = [
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
("o4-mini - Specialized reasoning model (compact)", "o4-mini"),
("o3-mini - Advanced reasoning model (lightweight)", "o3-mini"),
("o3 - Full advanced reasoning model", "o3"),
("o1 - Premier reasoning and problem-solving model", "o1"),
]
choice = questionary.select(
"Select Your [Deep-Thinking LLM Engine]:",
choices=[
questionary.Choice(display, value=value)
for display, value in DEEP_AGENT_OPTIONS
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:magenta noinherit"),
("highlighted", "fg:magenta noinherit"),
("pointer", "fg:magenta noinherit"),
]
),
).ask()
if choice is None:
console.print("\n[red]No deep thinking llm engine selected. Exiting...[/red]")
exit(1)
return choice
import questionary
from typing import List, Optional, Tuple, Dict
from cli.models import AnalystType
ANALYST_ORDER = [
("Market Analyst", AnalystType.MARKET),
("Social Media Analyst", AnalystType.SOCIAL),
("News Analyst", AnalystType.NEWS),
("Fundamentals Analyst", AnalystType.FUNDAMENTALS),
]
def get_ticker() -> str:
"""Prompt the user to enter a ticker symbol."""
ticker = questionary.text(
"Enter the ticker symbol to analyze:",
validate=lambda x: len(x.strip()) > 0 or "Please enter a valid ticker symbol.",
style=questionary.Style(
[
("text", "fg:green"),
("highlighted", "noinherit"),
]
),
).ask()
if not ticker:
console.print("\n[red]No ticker symbol provided. Exiting...[/red]")
exit(1)
return ticker.strip().upper()
def get_analysis_date() -> str:
"""Prompt the user to enter a date in YYYY-MM-DD format."""
import re
from datetime import datetime
def validate_date(date_str: str) -> bool:
if not re.match(r"^\d{4}-\d{2}-\d{2}$", date_str):
return False
try:
datetime.strptime(date_str, "%Y-%m-%d")
return True
except ValueError:
return False
date = questionary.text(
"Enter the analysis date (YYYY-MM-DD):",
validate=lambda x: validate_date(x.strip())
or "Please enter a valid date in YYYY-MM-DD format.",
style=questionary.Style(
[
("text", "fg:green"),
("highlighted", "noinherit"),
]
),
).ask()
if not date:
console.print("\n[red]No date provided. Exiting...[/red]")
exit(1)
return date.strip()
def select_analysts() -> List[AnalystType]:
"""Select analysts using an interactive checkbox."""
choices = questionary.checkbox(
"Select Your [Analysts Team]:",
choices=[
questionary.Choice(display, value=value) for display, value in ANALYST_ORDER
],
instruction="\n- Press Space to select/unselect analysts\n- Press 'a' to select/unselect all\n- Press Enter when done",
validate=lambda x: len(x) > 0 or "You must select at least one analyst.",
style=questionary.Style(
[
("checkbox-selected", "fg:green"),
("selected", "fg:green noinherit"),
("highlighted", "noinherit"),
("pointer", "noinherit"),
]
),
).ask()
if not choices:
console.print("\n[red]No analysts selected. Exiting...[/red]")
exit(1)
return choices
def select_research_depth() -> int:
"""Select research depth using an interactive selection."""
# Define research depth options with their corresponding values
DEPTH_OPTIONS = [
("Shallow - Quick research, few debate and strategy discussion rounds", 1),
("Medium - Middle ground, moderate debate rounds and strategy discussion", 3),
("Deep - Comprehensive research, in depth debate and strategy discussion", 5),
]
choice = questionary.select(
"Select Your [Research Depth]:",
choices=[
questionary.Choice(display, value=value) for display, value in DEPTH_OPTIONS
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:yellow noinherit"),
("highlighted", "fg:yellow noinherit"),
("pointer", "fg:yellow noinherit"),
]
),
).ask()
if choice is None:
console.print("\n[red]No research depth selected. Exiting...[/red]")
exit(1)
return choice
def select_shallow_thinking_agent(provider) -> str:
"""Select shallow thinking llm engine using an interactive selection."""
# Define shallow thinking llm engine options with their corresponding model names
SHALLOW_AGENT_OPTIONS = {
"openai": [
("GPT-4o-mini - Fast and efficient for quick tasks", "gpt-4o-mini"),
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
],
"anthropic": [
("Claude Haiku 3.5 - Fast inference and standard capabilities", "claude-3-5-haiku-latest"),
("Claude Sonnet 3.5 - Highly capable standard model", "claude-3-5-sonnet-latest"),
("Claude Sonnet 3.7 - Exceptional hybrid reasoning and agentic capabilities", "claude-3-7-sonnet-latest"),
("Claude Sonnet 4 - High performance and excellent reasoning", "claude-sonnet-4-0"),
],
"google": [
("Gemini 2.0 Flash - Next generation features, speed, and thinking", "gemini-2.0-flash"),
("Gemini 2.5 Flash-Lite - Cost efficiency and low latency", "gemini-2.5-flash-lite-preview-06-17"),
("Gemini 2.5 Flash - Adaptive thinking, cost efficiency", "gemini-2.5-flash"),
],
"openrouter": [
("Meta: Llama 4 Scout", "meta-llama/llama-4-scout:free"),
("Meta: Llama 3.3 8B Instruct - A lightweight and ultra-fast variant of Llama 3.3 70B", "meta-llama/llama-3.3-8b-instruct:free"),
("google/gemini-2.0-flash-exp:free - Gemini Flash 2.0 offers a significantly faster time to first token", "google/gemini-2.0-flash-exp:free"),
],
"ollama": [
("llama3.1 local", "llama3.1"),
("llama3.2 local", "llama3.2"),
]
}
choice = questionary.select(
"Select Your [Quick-Thinking LLM Engine]:",
choices=[
questionary.Choice(display, value=value)
for display, value in SHALLOW_AGENT_OPTIONS[provider.lower()]
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:magenta noinherit"),
("highlighted", "fg:magenta noinherit"),
("pointer", "fg:magenta noinherit"),
]
),
).ask()
if choice is None:
console.print(
"\n[red]No shallow thinking llm engine selected. Exiting...[/red]"
)
exit(1)
return choice
def select_deep_thinking_agent(provider) -> str:
"""Select deep thinking llm engine using an interactive selection."""
# Define deep thinking llm engine options with their corresponding model names
DEEP_AGENT_OPTIONS = {
"openai": [
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
("o4-mini - Specialized reasoning model (compact)", "o4-mini"),
("o3-mini - Advanced reasoning model (lightweight)", "o3-mini"),
("o3 - Full advanced reasoning model", "o3"),
("o1 - Premier reasoning and problem-solving model", "o1"),
],
"anthropic": [
("Claude Haiku 3.5 - Fast inference and standard capabilities", "claude-3-5-haiku-latest"),
("Claude Sonnet 3.5 - Highly capable standard model", "claude-3-5-sonnet-latest"),
("Claude Sonnet 3.7 - Exceptional hybrid reasoning and agentic capabilities", "claude-3-7-sonnet-latest"),
("Claude Sonnet 4 - High performance and excellent reasoning", "claude-sonnet-4-0"),
("Claude Opus 4 - Most powerful Anthropic model", " claude-opus-4-0"),
],
"google": [
("Gemini 2.0 Flash - Next generation features, speed, and thinking", "gemini-2.0-flash"),
("Gemini 2.5 Flash-Lite - Cost efficiency and low latency", "gemini-2.5-flash-lite-preview-06-17"),
("Gemini 2.5 Flash - Adaptive thinking, cost efficiency", "gemini-2.5-flash"),
("Gemini 2.5 Pro - Most powerful Gemini model", "gemini-2.5-pro"),
],
"openrouter": [
("DeepSeek V3 - a 685B-parameter, mixture-of-experts model", "deepseek/deepseek-chat-v3-0324:free"),
("Deepseek - latest iteration of the flagship chat model family from the DeepSeek team.", "deepseek/deepseek-chat-v3-0324:free"),
],
"ollama": [
("llama3.1 local", "llama3.1"),
("qwen3", "qwen3"),
]
}
choice = questionary.select(
"Select Your [Deep-Thinking LLM Engine]:",
choices=[
questionary.Choice(display, value=value)
for display, value in DEEP_AGENT_OPTIONS[provider.lower()]
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:magenta noinherit"),
("highlighted", "fg:magenta noinherit"),
("pointer", "fg:magenta noinherit"),
]
),
).ask()
if choice is None:
console.print("\n[red]No deep thinking llm engine selected. Exiting...[/red]")
exit(1)
return choice
def select_llm_provider() -> tuple[str, str]:
"""Select the OpenAI api url using interactive selection."""
# Define OpenAI api options with their corresponding endpoints
BASE_URLS = [
("OpenAI", "https://api.openai.com/v1"),
("Anthropic", "https://api.anthropic.com/"),
("Google", "https://generativelanguage.googleapis.com/v1"),
("Openrouter", "https://openrouter.ai/api/v1"),
("Ollama", "http://localhost:11434/v1"),
]
choice = questionary.select(
"Select your LLM Provider:",
choices=[
questionary.Choice(display, value=(display, value))
for display, value in BASE_URLS
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
[
("selected", "fg:magenta noinherit"),
("highlighted", "fg:magenta noinherit"),
("pointer", "fg:magenta noinherit"),
]
),
).ask()
if choice is None:
console.print("\n[red]no OpenAI backend selected. Exiting...[/red]")
exit(1)
display_name, url = choice
print(f"You selected: {display_name}\tURL: {url}")
return display_name, url

40
main.py
View File

@ -1,19 +1,21 @@
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
# Create a custom config
config = DEFAULT_CONFIG.copy()
config["deep_think_llm"] = "gpt-4.1-nano" # Use a different model
config["quick_think_llm"] = "gpt-4.1-nano" # Use a different model
config["max_debate_rounds"] = 1 # Increase debate rounds
config["online_tools"] = True # Increase debate rounds
# Initialize with custom config
ta = TradingAgentsGraph(debug=True, config=config)
# forward propagate
_, decision = ta.propagate("NVDA", "2024-05-10")
print(decision)
# Memorize mistakes and reflect
# ta.reflect_and_remember(1000) # parameter is the position returns
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
# Create a custom config
config = DEFAULT_CONFIG.copy()
config["llm_provider"] = "google" # Use a different model
config["backend_url"] = "https://generativelanguage.googleapis.com/v1" # Use a different backend
config["deep_think_llm"] = "gemini-2.5-pro" # Use a different model
config["quick_think_llm"] = "gemini-2.5-flash-lite-preview-06-17" # Use a different model
config["max_debate_rounds"] = 1 # Increase debate rounds
config["online_tools"] = True # Increase debate rounds
# Initialize with custom config
ta = TradingAgentsGraph(debug=True, config=config)
# forward propagate
_, decision = ta.propagate("NVDA", "2024-05-10")
print(decision)
# Memorize mistakes and reflect
# ta.reflect_and_remember(1000) # parameter is the position returns

34
pyproject.toml Normal file
View File

@ -0,0 +1,34 @@
[project]
name = "tradingagents"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"akshare>=1.16.98",
"backtrader>=1.9.78.123",
"chainlit>=2.5.5",
"chromadb>=1.0.12",
"eodhd>=1.0.32",
"feedparser>=6.0.11",
"finnhub-python>=2.4.23",
"langchain-anthropic>=0.3.15",
"langchain-experimental>=0.3.4",
"langchain-google-genai>=2.1.5",
"langchain-openai>=0.3.23",
"langgraph>=0.4.8",
"pandas>=2.3.0",
"parsel>=1.10.0",
"praw>=7.8.1",
"pytz>=2025.2",
"questionary>=2.1.0",
"redis>=6.2.0",
"requests>=2.32.4",
"rich>=14.0.0",
"setuptools>=80.9.0",
"stockstats>=0.6.5",
"tqdm>=4.67.1",
"tushare>=1.4.21",
"typing-extensions>=4.14.0",
"yfinance>=0.2.63",
]

View File

@ -1,22 +1,3 @@
# Backend dependencies - Django
Django==4.2.7
django-cors-headers==4.3.1
django-rest-framework==0.1.0
djangorestframework==3.14.0
djangorestframework-simplejwt==5.3.0
python-decouple==3.8
cryptography==41.0.7
mysqlclient==2.2.0
channels==4.0.0
channels-redis
# Existing CLI dependencies
typer
questionary
pydantic
# OpenAI and other AI dependencies
openai
typing-extensions
langchain-openai
langchain-experimental
@ -26,6 +7,7 @@ praw
feedparser
stockstats
eodhd
langgraph
chromadb
setuptools
backtrader
@ -40,5 +22,6 @@ redis
chainlit
rich
questionary
langgraph==0.4.8
daphne
langchain_anthropic
langchain-google-genai
google-genai

View File

@ -10,7 +10,7 @@ def create_fundamentals_analyst(llm, toolkit):
company_name = state["company_of_interest"]
if toolkit.config["online_tools"]:
tools = [toolkit.get_fundamentals_openai]
tools = [toolkit.get_fundamentals]
else:
tools = [
toolkit.get_finnhub_company_insider_sentiment,
@ -21,40 +21,8 @@ def create_fundamentals_analyst(llm, toolkit):
]
system_message = (
"""You are a fundamental analyst. Your task is to provide a comprehensive report on a given company by analyzing its financial documents, company profile, financial history, insider sentiment, and transactions.
You must output your findings in a structured JSON format. Do not add any text outside the JSON structure.
The JSON object must contain the following keys:
1. `company_overview`: A string with a summary of the company's business and market position.
2. `financial_performance`: An array of objects, each with `metric` and `value` keys (e.g., {"metric": "Earnings Per Share (EPS)", "value": "Increased by 354%"}).
3. `stock_market_info`: An array of objects, each with `metric` and `value` keys (e.g., {"metric": "Current Stock Price", "value": "$380.58"}).
4. `analyst_forecasts`: An array of objects, each with `metric` and `value` keys (e.g., {"metric": "Median Price Target", "value": "$538.00"}).
5. `insider_sentiment`: A string summarizing insider trading activity and sentiment.
6. `summary`: A string providing a final, overall conclusion based on all the fundamental data.
Here is an example of the expected JSON output format:
```json
{
"company_overview": "Applovin Corporation (APP)은 모바일 앱 개발 및 수익화에 특화된 기술 회사입니다. 지난 한 해 동안 괄목할 만한 재무 성과를 보여주며 시장에서 강력한 입지를 나타냈습니다.",
"financial_performance": [
{"metric": "주당 순이익 (EPS)", "value": "지난 1년간 354% 증가"},
{"metric": "매출 성장률", "value": "전년 대비 43.44% 성장"}
],
"stock_market_info": [
{"metric": "현재 주가", "value": "$380.58"},
{"metric": "전일 대비 변동", "value": "-0.74% 감소"}
],
"analyst_forecasts": [
{"metric": "중간 목표 주가", "value": "$538.00 (현재가 대비 약 75.4% 상승 가능성)"}
],
"insider_sentiment": "제공된 데이터에서는 구체적인 내부자 거래 내역이 자세히 설명되지 않았지만, 임원 및 이사회 구성원의 신뢰도에 대한 통찰력을 제공할 수 있습니다.",
"summary": "전반적인 재무 건전성은 긍정적이나, 주가 변동성을 고려할 때 신중한 접근이 필요합니다."
}
```
Please ensure all text content within the JSON is written in Korean.
"""
"**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)\n\nYou are a researcher tasked with analyzing fundamental information over the past week about a company. Please write a comprehensive report of the company's fundamental information such as financial documents, company profile, basic company financials, company financial history, insider sentiment and insider transactions to gain a full view of the company's fundamental information to inform traders. Make sure to include as much detail as possible. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ " Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read.",
)
prompt = ChatPromptTemplate.from_messages(
@ -83,9 +51,14 @@ Please ensure all text content within the JSON is written in Korean.
result = chain.invoke(state["messages"])
report = ""
if len(result.tool_calls) == 0:
report = result.content
return {
"messages": [result],
"fundamentals_report": result.content,
"fundamentals_report": report,
}
return fundamentals_analyst_node

View File

@ -22,65 +22,34 @@ def create_market_analyst(llm, toolkit):
]
system_message = (
"""You are a trading assistant tasked with analyzing financial markets. Your role is to select the **most relevant indicators** for a given market condition or trading strategy from the following list. The goal is to choose up to **8 indicators** that provide complementary insights without redundancy.
First, call `get_YFin_data` to retrieve the necessary stock data. Then, use `get_stockstats_indicators_report` with the selected indicators.
"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
After analyzing the results, you must output your findings in a structured JSON format. Do not add any text outside the JSON structure.
You are a trading assistant tasked with analyzing financial markets. Your role is to select the **most relevant indicators** for a given market condition or trading strategy from the following list. The goal is to choose up to **8 indicators** that provide complementary insights without redundancy. Categories and each category's indicators are:
The JSON object must contain the following keys:
1. `price_summary`: A string containing a detailed analysis of the stock's price movement (최고가, 최저가, 최근 동향 등).
2. `indicator_analysis`: An array of objects, where each object represents a technical indicator and has the following keys:
- `indicator`: The name of the indicator (e.g., "50 SMA").
- `value`: The calculated value of the indicator.
- `interpretation`: A detailed interpretation of what the indicator's value means in the current market context.
3. `overall_conclusion`: A string providing a comprehensive conclusion based on the combined analysis of price trends and technical indicators.
Here is an example of the expected JSON output format:
```json
{
"price_summary": "APP의 최근 주가는 2025년 6월 12일 기준으로 380.58 달러로 마감하였으며, 최고가는 428.99 달러(2025년 6월 5일), 최저가는 276.8 달러(2025년 5월 1일)입니다. 5월 초에 비해 급격히 상승하였으나, 최근에는 약간의 조정세를 보이고 있습니다.",
"indicator_analysis": [
{
"indicator": "50 SMA",
"value": "319.97",
"interpretation": "중기 추세 지표로, 현재 주가가 이 지표를 상회하고 있어 상승 추세를 나타냅니다."
},
{
"indicator": "MACD",
"value": "18.33",
"interpretation": "모멘텀 지표로, 양수 값을 유지하고 있어 상승 모멘텀을 나타냅니다."
}
],
"overall_conclusion": "APP의 주가는 현재 강한 상승세를 보이고 있으나, 단기 조정 가능성이 존재합니다. 따라서, 투자자들은 시장의 변동성을 고려하여 신중한 접근이 필요합니다."
}
```
Available indicators:
Moving Averages:
- close_50_sma: 50 SMA
- close_200_sma: 200 SMA
- close_10_ema: 10 EMA
- close_50_sma: 50 SMA: A medium-term trend indicator. Usage: Identify trend direction and serve as dynamic support/resistance. Tips: It lags price; combine with faster indicators for timely signals.
- close_200_sma: 200 SMA: A long-term trend benchmark. Usage: Confirm overall market trend and identify golden/death cross setups. Tips: It reacts slowly; best for strategic trend confirmation rather than frequent trading entries.
- close_10_ema: 10 EMA: A responsive short-term average. Usage: Capture quick shifts in momentum and potential entry points. Tips: Prone to noise in choppy markets; use alongside longer averages for filtering false signals.
MACD Related:
- macd: MACD
- macds: MACD Signal
- macdh: MACD Histogram
- macd: MACD: Computes momentum via differences of EMAs. Usage: Look for crossovers and divergence as signals of trend changes. Tips: Confirm with other indicators in low-volatility or sideways markets.
- macds: MACD Signal: An EMA smoothing of the MACD line. Usage: Use crossovers with the MACD line to trigger trades. Tips: Should be part of a broader strategy to avoid false positives.
- macdh: MACD Histogram: Shows the gap between the MACD line and its signal. Usage: Visualize momentum strength and spot divergence early. Tips: Can be volatile; complement with additional filters in fast-moving markets.
Momentum Indicators:
- rsi: RSI
- rsi: RSI: Measures momentum to flag overbought/oversold conditions. Usage: Apply 70/30 thresholds and watch for divergence to signal reversals. Tips: In strong trends, RSI may remain extreme; always cross-check with trend analysis.
Volatility Indicators:
- boll: Bollinger Middle
- boll_ub: Bollinger Upper Band
- boll_lb: Bollinger Lower Band
- atr: ATR
- boll: Bollinger Middle: A 20 SMA serving as the basis for Bollinger Bands. Usage: Acts as a dynamic benchmark for price movement. Tips: Combine with the upper and lower bands to effectively spot breakouts or reversals.
- boll_ub: Bollinger Upper Band: Typically 2 standard deviations above the middle line. Usage: Signals potential overbought conditions and breakout zones. Tips: Confirm signals with other tools; prices may ride the band in strong trends.
- boll_lb: Bollinger Lower Band: Typically 2 standard deviations below the middle line. Usage: Indicates potential oversold conditions. Tips: Use additional analysis to avoid false reversal signals.
- atr: ATR: Averages true range to measure volatility. Usage: Set stop-loss levels and adjust position sizes based on current market volatility. Tips: It's a reactive measure, so use it as part of a broader risk management strategy.
Volume-Based Indicators:
- vwma: VWMA
- vwma: VWMA: A moving average weighted by volume. Usage: Confirm trends by integrating price action with volume data. Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses.
Please write all text content within the JSON in Korean.
"""
- Select indicators that provide diverse and complementary information. Avoid redundancy (e.g., do not select both rsi and stochrsi). Also briefly explain why they are suitable for the given market context. When you tool call, please use the exact name of the indicators provided above as they are defined parameters, otherwise your call will fail. Please make sure to call get_YFin_data first to retrieve the CSV that is needed to generate indicators. Write a very detailed and nuanced report of the trends you observe. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."""
+ """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."""
)
prompt = ChatPromptTemplate.from_messages(
@ -109,9 +78,14 @@ Please write all text content within the JSON in Korean.
result = chain.invoke(state["messages"])
report = ""
if len(result.tool_calls) == 0:
report = result.content
return {
"messages": [result],
"market_report": result.content,
"market_report": report,
}
return market_analyst_node

View File

@ -1,55 +1,60 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time
import json
def create_news_analyst(llm, toolkit):
def news_analyst_node(state):
current_date = state["trade_date"]
ticker = state["company_of_interest"]
if toolkit.config["online_tools"]:
tools = [toolkit.get_global_news_openai, toolkit.get_google_news]
else:
tools = [
toolkit.get_finnhub_news,
toolkit.get_reddit_news,
toolkit.get_google_news,
]
system_message = (
"You are a news researcher tasked with analyzing recent news and trends over the past week. Please write a comprehensive report of the current state of the world that is relevant for trading and macroeconomics. Look at news from EODHD, and finnhub to be comprehensive. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Makrdown table at the end of the report to organize key points in the report, organized and easy to read. Please write all responses in Korean."""
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK; another assistant with different tools"
" will help where you left off. Execute what you can to make progress."
" If you or any other assistant has the FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** or deliverable,"
" prefix your response with FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** so the team knows to stop."
" You have access to the following tools: {tool_names}.\n{system_message}"
"For your reference, the current date is {current_date}. We are looking at the company {ticker}",
),
MessagesPlaceholder(variable_name="messages"),
]
)
prompt = prompt.partial(system_message=system_message)
prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
prompt = prompt.partial(current_date=current_date)
prompt = prompt.partial(ticker=ticker)
chain = prompt | llm.bind_tools(tools)
result = chain.invoke(state["messages"])
return {
"messages": [result],
"news_report": result.content,
}
return news_analyst_node
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time
import json
def create_news_analyst(llm, toolkit):
def news_analyst_node(state):
current_date = state["trade_date"]
ticker = state["company_of_interest"]
if toolkit.config["online_tools"]:
tools = [toolkit.get_global_news, toolkit.get_google_news]
else:
tools = [
toolkit.get_finnhub_news,
toolkit.get_reddit_news,
toolkit.get_google_news,
]
system_message = (
"**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)\n\nYou are a news researcher tasked with analyzing recent news and trends over the past week. Please write a comprehensive report of the current state of the world that is relevant for trading and macroeconomics. Look at news from EODHD, and finnhub to be comprehensive. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Makrdown table at the end of the report to organize key points in the report, organized and easy to read."""
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK; another assistant with different tools"
" will help where you left off. Execute what you can to make progress."
" If you or any other assistant has the FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** or deliverable,"
" prefix your response with FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** so the team knows to stop."
" You have access to the following tools: {tool_names}.\n{system_message}"
"For your reference, the current date is {current_date}. We are looking at the company {ticker}",
),
MessagesPlaceholder(variable_name="messages"),
]
)
prompt = prompt.partial(system_message=system_message)
prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
prompt = prompt.partial(current_date=current_date)
prompt = prompt.partial(ticker=ticker)
chain = prompt | llm.bind_tools(tools)
result = chain.invoke(state["messages"])
report = ""
if len(result.tool_calls) == 0:
report = result.content
return {
"messages": [result],
"news_report": report,
}
return news_analyst_node

View File

@ -1,55 +1,60 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time
import json
def create_social_media_analyst(llm, toolkit):
def social_media_analyst_node(state):
current_date = state["trade_date"]
ticker = state["company_of_interest"]
company_name = state["company_of_interest"]
if toolkit.config["online_tools"]:
tools = [toolkit.get_stock_news_openai]
else:
tools = [
toolkit.get_reddit_stock_info,
]
system_message = (
"You are a social media and company specific news researcher/analyst tasked with analyzing social media posts, recent company news, and public sentiment for a specific company over the past week. You will be given a company's name your objective is to write a comprehensive long report detailing your analysis, insights, and implications for traders and investors on this company's current state after looking at social media and what people are saying about that company, analyzing sentiment data of what people feel each day about the company, and looking at recent company news. Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Makrdown table at the end of the report to organize key points in the report, organized and easy to read.""",
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK; another assistant with different tools"
" will help where you left off. Execute what you can to make progress."
" If you or any other assistant has the FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** or deliverable,"
" prefix your response with FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** so the team knows to stop."
" You have access to the following tools: {tool_names}.\n{system_message}"
"For your reference, the current date is {current_date}. The current company we want to analyze is {ticker}",
),
MessagesPlaceholder(variable_name="messages"),
]
)
prompt = prompt.partial(system_message=system_message)
prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
prompt = prompt.partial(current_date=current_date)
prompt = prompt.partial(ticker=ticker)
chain = prompt | llm.bind_tools(tools)
result = chain.invoke(state["messages"])
return {
"messages": [result],
"sentiment_report": result.content,
}
return social_media_analyst_node
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time
import json
def create_social_media_analyst(llm, toolkit):
def social_media_analyst_node(state):
current_date = state["trade_date"]
ticker = state["company_of_interest"]
company_name = state["company_of_interest"]
if toolkit.config["online_tools"]:
tools = [toolkit.get_stock_news]
else:
tools = [
toolkit.get_reddit_stock_info,
]
system_message = (
"**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)\n\nYou are a social media and company specific news researcher/analyst tasked with analyzing social media posts, recent company news, and public sentiment for a specific company over the past week. You will be given a company's name your objective is to write a comprehensive long report detailing your analysis, insights, and implications for traders and investors on this company's current state after looking at social media and what people are saying about that company, analyzing sentiment data of what people feel each day about the company, and looking at recent company news. Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Makrdown table at the end of the report to organize key points in the report, organized and easy to read.""",
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK; another assistant with different tools"
" will help where you left off. Execute what you can to make progress."
" If you or any other assistant has the FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** or deliverable,"
" prefix your response with FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** so the team knows to stop."
" You have access to the following tools: {tool_names}.\n{system_message}"
"For your reference, the current date is {current_date}. The current company we want to analyze is {ticker}",
),
MessagesPlaceholder(variable_name="messages"),
]
)
prompt = prompt.partial(system_message=system_message)
prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
prompt = prompt.partial(current_date=current_date)
prompt = prompt.partial(ticker=ticker)
chain = prompt | llm.bind_tools(tools)
result = chain.invoke(state["messages"])
report = ""
if len(result.tool_calls) == 0:
report = result.content
return {
"messages": [result],
"sentiment_report": report,
}
return social_media_analyst_node

View File

@ -1,57 +1,57 @@
import time
import json
def create_research_manager(llm, memory):
def research_manager_node(state) -> dict:
history = state["investment_debate_state"].get("history", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
investment_debate_state = state["investment_debate_state"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
prompt = f"""As the portfolio manager and debate facilitator, your role is to critically evaluate this round of debate and make a definitive decision: align with the bear analyst, the bull analyst, or choose Hold only if it is strongly justified based on the arguments presented.
Summarize the key points from both sides concisely, focusing on the most compelling evidence or reasoning. Your recommendationBuy, Sell, or Holdmust be clear and actionable. Avoid defaulting to Hold simply because both sides have valid points; commit to a stance grounded in the debate's strongest arguments.
Additionally, develop a detailed investment plan for the trader. This should include:
Your Recommendation: A decisive stance supported by the most convincing arguments.
Rationale: An explanation of why these arguments lead to your conclusion.
Strategic Actions: Concrete steps for implementing the recommendation.
Take into account your past mistakes on similar situations. Use these insights to refine your decision-making and ensure you are learning and improving. Present your analysis conversationally, as if speaking naturally, without special formatting.
Here are your past reflections on mistakes:
\"{past_memory_str}\"
Here is the debate:
Debate History:
{history}
Please write all responses in Korean."""
response = llm.invoke(prompt)
new_investment_debate_state = {
"judge_decision": response.content,
"history": investment_debate_state.get("history", ""),
"bear_history": investment_debate_state.get("bear_history", ""),
"bull_history": investment_debate_state.get("bull_history", ""),
"current_response": response.content,
"count": investment_debate_state["count"],
}
return {
"investment_debate_state": new_investment_debate_state,
"investment_plan": response.content,
}
return research_manager_node
import time
import json
def create_research_manager(llm, memory):
def research_manager_node(state) -> dict:
history = state["investment_debate_state"].get("history", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
investment_debate_state = state["investment_debate_state"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
prompt = f"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
As the portfolio manager and debate facilitator, your role is to critically evaluate this round of debate and make a definitive decision: align with the bear analyst, the bull analyst, or choose Hold only if it is strongly justified based on the arguments presented.
Summarize the key points from both sides concisely, focusing on the most compelling evidence or reasoning. Your recommendationBuy, Sell, or Holdmust be clear and actionable. Avoid defaulting to Hold simply because both sides have valid points; commit to a stance grounded in the debate's strongest arguments.
Additionally, develop a detailed investment plan for the trader. This should include:
Your Recommendation: A decisive stance supported by the most convincing arguments.
Rationale: An explanation of why these arguments lead to your conclusion.
Strategic Actions: Concrete steps for implementing the recommendation.
Take into account your past mistakes on similar situations. Use these insights to refine your decision-making and ensure you are learning and improving. Present your analysis conversationally, as if speaking naturally, without special formatting.
Here are your past reflections on mistakes:
\"{past_memory_str}\"
Here is the debate:
Debate History:
{history}"""
response = llm.invoke(prompt)
new_investment_debate_state = {
"judge_decision": response.content,
"history": investment_debate_state.get("history", ""),
"bear_history": investment_debate_state.get("bear_history", ""),
"bull_history": investment_debate_state.get("bull_history", ""),
"current_response": response.content,
"count": investment_debate_state["count"],
}
return {
"investment_debate_state": new_investment_debate_state,
"investment_plan": response.content,
}
return research_manager_node

View File

@ -1,66 +1,68 @@
import time
import json
def create_risk_manager(llm, memory):
def risk_manager_node(state) -> dict:
company_name = state["company_of_interest"]
history = state["risk_debate_state"]["history"]
risk_debate_state = state["risk_debate_state"]
market_research_report = state["market_report"]
news_report = state["news_report"]
fundamentals_report = state["news_report"]
sentiment_report = state["sentiment_report"]
trader_plan = state["investment_plan"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
prompt = f"""As the Risk Management Judge and Debate Facilitator, your goal is to evaluate the debate between three risk analysts—Risky, Neutral, and Safe/Conservative—and determine the best course of action for the trader. Your decision must result in a clear recommendation: Buy, Sell, or Hold. Choose Hold only if strongly justified by specific arguments, not as a fallback when all sides seem valid. Strive for clarity and decisiveness.
Guidelines for Decision-Making:
1. **Summarize Key Arguments**: Extract the strongest points from each analyst, focusing on relevance to the context.
2. **Provide Rationale**: Support your recommendation with direct quotes and counterarguments from the debate.
3. **Refine the Trader's Plan**: Start with the trader's original plan, **{trader_plan}**, and adjust it based on the analysts' insights.
4. **Learn from Past Mistakes**: Use lessons from **{past_memory_str}** to address prior misjudgments and improve the decision you are making now to make sure you don't make a wrong BUY/SELL/HOLD call that loses money.
Deliverables:
- A clear and actionable recommendation: Buy, Sell, or Hold.
- Detailed reasoning anchored in the debate and past reflections.
---
**Analysts Debate History:**
{history}
---
Focus on actionable insights and continuous improvement. Build on past lessons, critically evaluate all perspectives, and ensure each decision advances better outcomes. Please write all responses in Korean."""
response = llm.invoke(prompt)
new_risk_debate_state = {
"judge_decision": response.content,
"history": risk_debate_state["history"],
"risky_history": risk_debate_state["risky_history"],
"safe_history": risk_debate_state["safe_history"],
"neutral_history": risk_debate_state["neutral_history"],
"latest_speaker": "Judge",
"current_risky_response": risk_debate_state["current_risky_response"],
"current_safe_response": risk_debate_state["current_safe_response"],
"current_neutral_response": risk_debate_state["current_neutral_response"],
"count": risk_debate_state["count"],
}
return {
"risk_debate_state": new_risk_debate_state,
"final_trade_decision": response.content,
}
return risk_manager_node
import time
import json
def create_risk_manager(llm, memory):
def risk_manager_node(state) -> dict:
company_name = state["company_of_interest"]
history = state["risk_debate_state"]["history"]
risk_debate_state = state["risk_debate_state"]
market_research_report = state["market_report"]
news_report = state["news_report"]
fundamentals_report = state["news_report"]
sentiment_report = state["sentiment_report"]
trader_plan = state["investment_plan"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
prompt = f"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
As the Risk Management Judge and Debate Facilitator, your goal is to evaluate the debate between three risk analystsRisky, Neutral, and Safe/Conservativeand determine the best course of action for the trader. Your decision must result in a clear recommendation: Buy, Sell, or Hold. Choose Hold only if strongly justified by specific arguments, not as a fallback when all sides seem valid. Strive for clarity and decisiveness.
Guidelines for Decision-Making:
1. **Summarize Key Arguments**: Extract the strongest points from each analyst, focusing on relevance to the context.
2. **Provide Rationale**: Support your recommendation with direct quotes and counterarguments from the debate.
3. **Refine the Trader's Plan**: Start with the trader's original plan, **{trader_plan}**, and adjust it based on the analysts' insights.
4. **Learn from Past Mistakes**: Use lessons from **{past_memory_str}** to address prior misjudgments and improve the decision you are making now to make sure you don't make a wrong BUY/SELL/HOLD call that loses money.
Deliverables:
- A clear and actionable recommendation: Buy, Sell, or Hold.
- Detailed reasoning anchored in the debate and past reflections.
---
**Analysts Debate History:**
{history}
---
Focus on actionable insights and continuous improvement. Build on past lessons, critically evaluate all perspectives, and ensure each decision advances better outcomes."""
response = llm.invoke(prompt)
new_risk_debate_state = {
"judge_decision": response.content,
"history": risk_debate_state["history"],
"risky_history": risk_debate_state["risky_history"],
"safe_history": risk_debate_state["safe_history"],
"neutral_history": risk_debate_state["neutral_history"],
"latest_speaker": "Judge",
"current_risky_response": risk_debate_state["current_risky_response"],
"current_safe_response": risk_debate_state["current_safe_response"],
"current_neutral_response": risk_debate_state["current_neutral_response"],
"count": risk_debate_state["count"],
}
return {
"risk_debate_state": new_risk_debate_state,
"final_trade_decision": response.content,
}
return risk_manager_node

View File

@ -1,61 +1,63 @@
from langchain_core.messages import AIMessage
import time
import json
def create_bear_researcher(llm, memory):
def bear_node(state) -> dict:
investment_debate_state = state["investment_debate_state"]
history = investment_debate_state.get("history", "")
bear_history = investment_debate_state.get("bear_history", "")
current_response = investment_debate_state.get("current_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
prompt = f"""You are a Bear Analyst making the case against investing in the stock. Your goal is to present a well-reasoned argument emphasizing risks, challenges, and negative indicators. Leverage the provided research and data to highlight potential downsides and counter bullish arguments effectively.
Key points to focus on:
- Risks and Challenges: Highlight factors like market saturation, financial instability, or macroeconomic threats that could hinder the stock's performance.
- Competitive Weaknesses: Emphasize vulnerabilities such as weaker market positioning, declining innovation, or threats from competitors.
- Negative Indicators: Use evidence from financial data, market trends, or recent adverse news to support your position.
- Bull Counterpoints: Critically analyze the bull argument with specific data and sound reasoning, exposing weaknesses or over-optimistic assumptions.
- Engagement: Present your argument in a conversational style, directly engaging with the bull analyst's points and debating effectively rather than simply listing facts.
Resources available:
Market research report: {market_research_report}
Social media sentiment report: {sentiment_report}
Latest world affairs news: {news_report}
Company fundamentals report: {fundamentals_report}
Conversation history of the debate: {history}
Last bull argument: {current_response}
Reflections from similar situations and lessons learned: {past_memory_str}
Use this information to deliver a compelling bear argument, refute the bull's claims, and engage in a dynamic debate that demonstrates the risks and weaknesses of investing in the stock. You must also address reflections and learn from lessons and mistakes you made in the past. Please write all responses in Korean.
"""
response = llm.invoke(prompt)
argument = f"Bear Analyst: {response.content}"
new_investment_debate_state = {
"history": history + "\n" + argument,
"bear_history": bear_history + "\n" + argument,
"bull_history": investment_debate_state.get("bull_history", ""),
"current_response": argument,
"count": investment_debate_state["count"] + 1,
}
return {"investment_debate_state": new_investment_debate_state}
return bear_node
from langchain_core.messages import AIMessage
import time
import json
def create_bear_researcher(llm, memory):
def bear_node(state) -> dict:
investment_debate_state = state["investment_debate_state"]
history = investment_debate_state.get("history", "")
bear_history = investment_debate_state.get("bear_history", "")
current_response = investment_debate_state.get("current_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
prompt = f"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
You are a Bear Analyst making the case against investing in the stock. Your goal is to present a well-reasoned argument emphasizing risks, challenges, and negative indicators. Leverage the provided research and data to highlight potential downsides and counter bullish arguments effectively.
Key points to focus on:
- Risks and Challenges: Highlight factors like market saturation, financial instability, or macroeconomic threats that could hinder the stock's performance.
- Competitive Weaknesses: Emphasize vulnerabilities such as weaker market positioning, declining innovation, or threats from competitors.
- Negative Indicators: Use evidence from financial data, market trends, or recent adverse news to support your position.
- Bull Counterpoints: Critically analyze the bull argument with specific data and sound reasoning, exposing weaknesses or over-optimistic assumptions.
- Engagement: Present your argument in a conversational style, directly engaging with the bull analyst's points and debating effectively rather than simply listing facts.
Resources available:
Market research report: {market_research_report}
Social media sentiment report: {sentiment_report}
Latest world affairs news: {news_report}
Company fundamentals report: {fundamentals_report}
Conversation history of the debate: {history}
Last bull argument: {current_response}
Reflections from similar situations and lessons learned: {past_memory_str}
Use this information to deliver a compelling bear argument, refute the bull's claims, and engage in a dynamic debate that demonstrates the risks and weaknesses of investing in the stock. You must also address reflections and learn from lessons and mistakes you made in the past.
"""
response = llm.invoke(prompt)
argument = f"Bear Analyst: {response.content}"
new_investment_debate_state = {
"history": history + "\n" + argument,
"bear_history": bear_history + "\n" + argument,
"bull_history": investment_debate_state.get("bull_history", ""),
"current_response": argument,
"count": investment_debate_state["count"] + 1,
}
return {"investment_debate_state": new_investment_debate_state}
return bear_node

View File

@ -1,59 +1,61 @@
from langchain_core.messages import AIMessage
import time
import json
def create_bull_researcher(llm, memory):
def bull_node(state) -> dict:
investment_debate_state = state["investment_debate_state"]
history = investment_debate_state.get("history", "")
bull_history = investment_debate_state.get("bull_history", "")
current_response = investment_debate_state.get("current_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
prompt = f"""You are a Bull Analyst advocating for investing in the stock. Your task is to build a strong, evidence-based case emphasizing growth potential, competitive advantages, and positive market indicators. Leverage the provided research and data to address concerns and counter bearish arguments effectively.
Key points to focus on:
- Growth Potential: Highlight the company's market opportunities, revenue projections, and scalability.
- Competitive Advantages: Emphasize factors like unique products, strong branding, or dominant market positioning.
- Positive Indicators: Use financial health, industry trends, and recent positive news as evidence.
- Bear Counterpoints: Critically analyze the bear argument with specific data and sound reasoning, addressing concerns thoroughly and showing why the bull perspective holds stronger merit.
- Engagement: Present your argument in a conversational style, engaging directly with the bear analyst's points and debating effectively rather than just listing data.
Resources available:
Market research report: {market_research_report}
Social media sentiment report: {sentiment_report}
Latest world affairs news: {news_report}
Company fundamentals report: {fundamentals_report}
Conversation history of the debate: {history}
Last bear argument: {current_response}
Reflections from similar situations and lessons learned: {past_memory_str}
Use this information to deliver a compelling bull argument, refute the bear's concerns, and engage in a dynamic debate that demonstrates the strengths of the bull position. You must also address reflections and learn from lessons and mistakes you made in the past. Please write all responses in Korean.
"""
response = llm.invoke(prompt)
argument = f"Bull Analyst: {response.content}"
new_investment_debate_state = {
"history": history + "\n" + argument,
"bull_history": bull_history + "\n" + argument,
"bear_history": investment_debate_state.get("bear_history", ""),
"current_response": argument,
"count": investment_debate_state["count"] + 1,
}
return {"investment_debate_state": new_investment_debate_state}
return bull_node
from langchain_core.messages import AIMessage
import time
import json
def create_bull_researcher(llm, memory):
def bull_node(state) -> dict:
investment_debate_state = state["investment_debate_state"]
history = investment_debate_state.get("history", "")
bull_history = investment_debate_state.get("bull_history", "")
current_response = investment_debate_state.get("current_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
prompt = f"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
You are a Bull Analyst advocating for investing in the stock. Your task is to build a strong, evidence-based case emphasizing growth potential, competitive advantages, and positive market indicators. Leverage the provided research and data to address concerns and counter bearish arguments effectively.
Key points to focus on:
- Growth Potential: Highlight the company's market opportunities, revenue projections, and scalability.
- Competitive Advantages: Emphasize factors like unique products, strong branding, or dominant market positioning.
- Positive Indicators: Use financial health, industry trends, and recent positive news as evidence.
- Bear Counterpoints: Critically analyze the bear argument with specific data and sound reasoning, addressing concerns thoroughly and showing why the bull perspective holds stronger merit.
- Engagement: Present your argument in a conversational style, engaging directly with the bear analyst's points and debating effectively rather than just listing data.
Resources available:
Market research report: {market_research_report}
Social media sentiment report: {sentiment_report}
Latest world affairs news: {news_report}
Company fundamentals report: {fundamentals_report}
Conversation history of the debate: {history}
Last bear argument: {current_response}
Reflections from similar situations and lessons learned: {past_memory_str}
Use this information to deliver a compelling bull argument, refute the bear's concerns, and engage in a dynamic debate that demonstrates the strengths of the bull position. You must also address reflections and learn from lessons and mistakes you made in the past.
"""
response = llm.invoke(prompt)
argument = f"Bull Analyst: {response.content}"
new_investment_debate_state = {
"history": history + "\n" + argument,
"bull_history": bull_history + "\n" + argument,
"bear_history": investment_debate_state.get("bear_history", ""),
"current_response": argument,
"count": investment_debate_state["count"] + 1,
}
return {"investment_debate_state": new_investment_debate_state}
return bull_node

View File

@ -1,55 +1,57 @@
import time
import json
def create_risky_debator(llm):
def risky_node(state) -> dict:
risk_debate_state = state["risk_debate_state"]
history = risk_debate_state.get("history", "")
risky_history = risk_debate_state.get("risky_history", "")
current_safe_response = risk_debate_state.get("current_safe_response", "")
current_neutral_response = risk_debate_state.get("current_neutral_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
trader_decision = state["trader_investment_plan"]
prompt = f"""As the Risky Risk Analyst, your role is to actively champion high-reward, high-risk opportunities, emphasizing bold strategies and competitive advantages. When evaluating the trader's decision or plan, focus intently on the potential upside, growth potential, and innovative benefits—even when these come with elevated risk. Use the provided market data and sentiment analysis to strengthen your arguments and challenge the opposing views. Specifically, respond directly to each point made by the conservative and neutral analysts, countering with data-driven rebuttals and persuasive reasoning. Highlight where their caution might miss critical opportunities or where their assumptions may be overly conservative. Here is the trader's decision:
{trader_decision}
Your task is to create a compelling case for the trader's decision by questioning and critiquing the conservative and neutral stances to demonstrate why your high-reward perspective offers the best path forward. Incorporate insights from the following sources into your arguments:
Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history} Here are the last arguments from the conservative analyst: {current_safe_response} Here are the last arguments from the neutral analyst: {current_neutral_response}. If there are no responses from the other viewpoints, do not halluncinate and just present your point.
Engage actively by addressing any specific concerns raised, refuting the weaknesses in their logic, and asserting the benefits of risk-taking to outpace market norms. Maintain a focus on debating and persuading, not just presenting data. Challenge each counterpoint to underscore why a high-risk approach is optimal. Output conversationally as if you are speaking without any special formatting. Please write all responses in Korean."""
response = llm.invoke(prompt)
argument = f"Risky Analyst: {response.content}"
new_risk_debate_state = {
"history": history + "\n" + argument,
"risky_history": risky_history + "\n" + argument,
"safe_history": risk_debate_state.get("safe_history", ""),
"neutral_history": risk_debate_state.get("neutral_history", ""),
"latest_speaker": "Risky",
"current_risky_response": argument,
"current_safe_response": risk_debate_state.get("current_safe_response", ""),
"current_neutral_response": risk_debate_state.get(
"current_neutral_response", ""
),
"count": risk_debate_state["count"] + 1,
}
return {"risk_debate_state": new_risk_debate_state}
return risky_node
import time
import json
def create_risky_debator(llm):
def risky_node(state) -> dict:
risk_debate_state = state["risk_debate_state"]
history = risk_debate_state.get("history", "")
risky_history = risk_debate_state.get("risky_history", "")
current_safe_response = risk_debate_state.get("current_safe_response", "")
current_neutral_response = risk_debate_state.get("current_neutral_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
trader_decision = state["trader_investment_plan"]
prompt = f"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
As the Risky Risk Analyst, your role is to actively champion high-reward, high-risk opportunities, emphasizing bold strategies and competitive advantages. When evaluating the trader's decision or plan, focus intently on the potential upside, growth potential, and innovative benefits—even when these come with elevated risk. Use the provided market data and sentiment analysis to strengthen your arguments and challenge the opposing views. Specifically, respond directly to each point made by the conservative and neutral analysts, countering with data-driven rebuttals and persuasive reasoning. Highlight where their caution might miss critical opportunities or where their assumptions may be overly conservative. Here is the trader's decision:
{trader_decision}
Your task is to create a compelling case for the trader's decision by questioning and critiquing the conservative and neutral stances to demonstrate why your high-reward perspective offers the best path forward. Incorporate insights from the following sources into your arguments:
Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history} Here are the last arguments from the conservative analyst: {current_safe_response} Here are the last arguments from the neutral analyst: {current_neutral_response}. If there are no responses from the other viewpoints, do not halluncinate and just present your point.
Engage actively by addressing any specific concerns raised, refuting the weaknesses in their logic, and asserting the benefits of risk-taking to outpace market norms. Maintain a focus on debating and persuading, not just presenting data. Challenge each counterpoint to underscore why a high-risk approach is optimal. Output conversationally as if you are speaking without any special formatting."""
response = llm.invoke(prompt)
argument = f"Risky Analyst: {response.content}"
new_risk_debate_state = {
"history": history + "\n" + argument,
"risky_history": risky_history + "\n" + argument,
"safe_history": risk_debate_state.get("safe_history", ""),
"neutral_history": risk_debate_state.get("neutral_history", ""),
"latest_speaker": "Risky",
"current_risky_response": argument,
"current_safe_response": risk_debate_state.get("current_safe_response", ""),
"current_neutral_response": risk_debate_state.get(
"current_neutral_response", ""
),
"count": risk_debate_state["count"] + 1,
}
return {"risk_debate_state": new_risk_debate_state}
return risky_node

View File

@ -1,58 +1,60 @@
from langchain_core.messages import AIMessage
import time
import json
def create_safe_debator(llm):
def safe_node(state) -> dict:
risk_debate_state = state["risk_debate_state"]
history = risk_debate_state.get("history", "")
safe_history = risk_debate_state.get("safe_history", "")
current_risky_response = risk_debate_state.get("current_risky_response", "")
current_neutral_response = risk_debate_state.get("current_neutral_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
trader_decision = state["trader_investment_plan"]
prompt = f"""As the Safe/Conservative Risk Analyst, your primary objective is to protect assets, minimize volatility, and ensure steady, reliable growth. You prioritize stability, security, and risk mitigation, carefully assessing potential losses, economic downturns, and market volatility. When evaluating the trader's decision or plan, critically examine high-risk elements, pointing out where the decision may expose the firm to undue risk and where more cautious alternatives could secure long-term gains. Here is the trader's decision:
{trader_decision}
Your task is to actively counter the arguments of the Risky and Neutral Analysts, highlighting where their views may overlook potential threats or fail to prioritize sustainability. Respond directly to their points, drawing from the following data sources to build a convincing case for a low-risk approach adjustment to the trader's decision:
Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history} Here is the last response from the risky analyst: {current_risky_response} Here is the last response from the neutral analyst: {current_neutral_response}. If there are no responses from the other viewpoints, do not halluncinate and just present your point.
Engage by questioning their optimism and emphasizing the potential downsides they may have overlooked. Address each of their counterpoints to showcase why a conservative stance is ultimately the safest path for the firm's assets. Focus on debating and critiquing their arguments to demonstrate the strength of a low-risk strategy over their approaches. Output conversationally as if you are speaking without any special formatting. Please write all responses in Korean."""
response = llm.invoke(prompt)
argument = f"Safe Analyst: {response.content}"
new_risk_debate_state = {
"history": history + "\n" + argument,
"risky_history": risk_debate_state.get("risky_history", ""),
"safe_history": safe_history + "\n" + argument,
"neutral_history": risk_debate_state.get("neutral_history", ""),
"latest_speaker": "Safe",
"current_risky_response": risk_debate_state.get(
"current_risky_response", ""
),
"current_safe_response": argument,
"current_neutral_response": risk_debate_state.get(
"current_neutral_response", ""
),
"count": risk_debate_state["count"] + 1,
}
return {"risk_debate_state": new_risk_debate_state}
return safe_node
from langchain_core.messages import AIMessage
import time
import json
def create_safe_debator(llm):
def safe_node(state) -> dict:
risk_debate_state = state["risk_debate_state"]
history = risk_debate_state.get("history", "")
safe_history = risk_debate_state.get("safe_history", "")
current_risky_response = risk_debate_state.get("current_risky_response", "")
current_neutral_response = risk_debate_state.get("current_neutral_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
trader_decision = state["trader_investment_plan"]
prompt = f"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
As the Safe/Conservative Risk Analyst, your primary objective is to protect assets, minimize volatility, and ensure steady, reliable growth. You prioritize stability, security, and risk mitigation, carefully assessing potential losses, economic downturns, and market volatility. When evaluating the trader's decision or plan, critically examine high-risk elements, pointing out where the decision may expose the firm to undue risk and where more cautious alternatives could secure long-term gains. Here is the trader's decision:
{trader_decision}
Your task is to actively counter the arguments of the Risky and Neutral Analysts, highlighting where their views may overlook potential threats or fail to prioritize sustainability. Respond directly to their points, drawing from the following data sources to build a convincing case for a low-risk approach adjustment to the trader's decision:
Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history} Here is the last response from the risky analyst: {current_risky_response} Here is the last response from the neutral analyst: {current_neutral_response}. If there are no responses from the other viewpoints, do not halluncinate and just present your point.
Engage by questioning their optimism and emphasizing the potential downsides they may have overlooked. Address each of their counterpoints to showcase why a conservative stance is ultimately the safest path for the firm's assets. Focus on debating and critiquing their arguments to demonstrate the strength of a low-risk strategy over their approaches. Output conversationally as if you are speaking without any special formatting."""
response = llm.invoke(prompt)
argument = f"Safe Analyst: {response.content}"
new_risk_debate_state = {
"history": history + "\n" + argument,
"risky_history": risk_debate_state.get("risky_history", ""),
"safe_history": safe_history + "\n" + argument,
"neutral_history": risk_debate_state.get("neutral_history", ""),
"latest_speaker": "Safe",
"current_risky_response": risk_debate_state.get(
"current_risky_response", ""
),
"current_safe_response": argument,
"current_neutral_response": risk_debate_state.get(
"current_neutral_response", ""
),
"count": risk_debate_state["count"] + 1,
}
return {"risk_debate_state": new_risk_debate_state}
return safe_node

View File

@ -1,55 +1,57 @@
import time
import json
def create_neutral_debator(llm):
def neutral_node(state) -> dict:
risk_debate_state = state["risk_debate_state"]
history = risk_debate_state.get("history", "")
neutral_history = risk_debate_state.get("neutral_history", "")
current_risky_response = risk_debate_state.get("current_risky_response", "")
current_safe_response = risk_debate_state.get("current_safe_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
trader_decision = state["trader_investment_plan"]
prompt = f"""As the Neutral Risk Analyst, your role is to provide a balanced perspective, weighing both the potential benefits and risks of the trader's decision or plan. You prioritize a well-rounded approach, evaluating the upsides and downsides while factoring in broader market trends, potential economic shifts, and diversification strategies.Here is the trader's decision:
{trader_decision}
Your task is to challenge both the Risky and Safe Analysts, pointing out where each perspective may be overly optimistic or overly cautious. Use insights from the following data sources to support a moderate, sustainable strategy to adjust the trader's decision:
Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history} Here is the last response from the risky analyst: {current_risky_response} Here is the last response from the safe analyst: {current_safe_response}. If there are no responses from the other viewpoints, do not halluncinate and just present your point.
Engage actively by analyzing both sides critically, addressing weaknesses in the risky and conservative arguments to advocate for a more balanced approach. Challenge each of their points to illustrate why a moderate risk strategy might offer the best of both worlds, providing growth potential while safeguarding against extreme volatility. Focus on debating rather than simply presenting data, aiming to show that a balanced view can lead to the most reliable outcomes. Output conversationally as if you are speaking without any special formatting. Please write all responses in Korean."""
response = llm.invoke(prompt)
argument = f"Neutral Analyst: {response.content}"
new_risk_debate_state = {
"history": history + "\n" + argument,
"risky_history": risk_debate_state.get("risky_history", ""),
"safe_history": risk_debate_state.get("safe_history", ""),
"neutral_history": neutral_history + "\n" + argument,
"latest_speaker": "Neutral",
"current_risky_response": risk_debate_state.get(
"current_risky_response", ""
),
"current_safe_response": risk_debate_state.get("current_safe_response", ""),
"current_neutral_response": argument,
"count": risk_debate_state["count"] + 1,
}
return {"risk_debate_state": new_risk_debate_state}
return neutral_node
import time
import json
def create_neutral_debator(llm):
def neutral_node(state) -> dict:
risk_debate_state = state["risk_debate_state"]
history = risk_debate_state.get("history", "")
neutral_history = risk_debate_state.get("neutral_history", "")
current_risky_response = risk_debate_state.get("current_risky_response", "")
current_safe_response = risk_debate_state.get("current_safe_response", "")
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
trader_decision = state["trader_investment_plan"]
prompt = f"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
As the Neutral Risk Analyst, your role is to provide a balanced perspective, weighing both the potential benefits and risks of the trader's decision or plan. You prioritize a well-rounded approach, evaluating the upsides and downsides while factoring in broader market trends, potential economic shifts, and diversification strategies.Here is the trader's decision:
{trader_decision}
Your task is to challenge both the Risky and Safe Analysts, pointing out where each perspective may be overly optimistic or overly cautious. Use insights from the following data sources to support a moderate, sustainable strategy to adjust the trader's decision:
Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history} Here is the last response from the risky analyst: {current_risky_response} Here is the last response from the safe analyst: {current_safe_response}. If there are no responses from the other viewpoints, do not halluncinate and just present your point.
Engage actively by analyzing both sides critically, addressing weaknesses in the risky and conservative arguments to advocate for a more balanced approach. Challenge each of their points to illustrate why a moderate risk strategy might offer the best of both worlds, providing growth potential while safeguarding against extreme volatility. Focus on debating rather than simply presenting data, aiming to show that a balanced view can lead to the most reliable outcomes. Output conversationally as if you are speaking without any special formatting."""
response = llm.invoke(prompt)
argument = f"Neutral Analyst: {response.content}"
new_risk_debate_state = {
"history": history + "\n" + argument,
"risky_history": risk_debate_state.get("risky_history", ""),
"safe_history": risk_debate_state.get("safe_history", ""),
"neutral_history": neutral_history + "\n" + argument,
"latest_speaker": "Neutral",
"current_risky_response": risk_debate_state.get(
"current_risky_response", ""
),
"current_safe_response": risk_debate_state.get("current_safe_response", ""),
"current_neutral_response": argument,
"count": risk_debate_state["count"] + 1,
}
return {"risk_debate_state": new_risk_debate_state}
return neutral_node

View File

@ -1,43 +1,45 @@
import functools
import time
import json
def create_trader(llm, memory):
def trader_node(state, name):
company_name = state["company_of_interest"]
investment_plan = state["investment_plan"]
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
context = {
"role": "user",
"content": f"Based on a comprehensive analysis by a team of analysts, here is an investment plan tailored for {company_name}. This plan incorporates insights from current technical market trends, macroeconomic indicators, and social media sentiment. Use this plan as a foundation for evaluating your next trading decision.\n\nProposed Investment Plan: {investment_plan}\n\nLeverage these insights to make an informed and strategic decision.",
}
messages = [
{
"role": "system",
"content": f"""You are a trading agent analyzing market data to make investment decisions. Based on your analysis, provide a specific recommendation to buy, sell, or hold. End with a firm decision and always conclude your response with 'FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL**' to confirm your recommendation. Do not forget to utilize lessons from past decisions to learn from your mistakes. Here is some reflections from similar situatiosn you traded in and the lessons learned: {past_memory_str}. Please write all responses in Korean.""",
},
context,
]
result = llm.invoke(messages)
return {
"messages": [result],
"trader_investment_plan": result.content,
"sender": name,
}
return functools.partial(trader_node, name="Trader")
import functools
import time
import json
def create_trader(llm, memory):
def trader_node(state, name):
company_name = state["company_of_interest"]
investment_plan = state["investment_plan"]
market_research_report = state["market_report"]
sentiment_report = state["sentiment_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
past_memories = memory.get_memories(curr_situation, n_matches=2)
past_memory_str = ""
for i, rec in enumerate(past_memories, 1):
past_memory_str += rec["recommendation"] + "\n\n"
context = {
"role": "user",
"content": f"Based on a comprehensive analysis by a team of analysts, here is an investment plan tailored for {company_name}. This plan incorporates insights from current technical market trends, macroeconomic indicators, and social media sentiment. Use this plan as a foundation for evaluating your next trading decision.\n\nProposed Investment Plan: {investment_plan}\n\nLeverage these insights to make an informed and strategic decision.",
}
messages = [
{
"role": "system",
"content": f"""**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
You are a trading agent analyzing market data to make investment decisions. Based on your analysis, provide a specific recommendation to buy, sell, or hold. End with a firm decision and always conclude your response with 'FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL**' to confirm your recommendation. Do not forget to utilize lessons from past decisions to learn from your mistakes. Here is some reflections from similar situatiosn you traded in and the lessons learned: {past_memory_str}""",
},
context,
]
result = llm.invoke(messages)
return {
"messages": [result],
"trader_investment_plan": result.content,
"sender": name,
}
return functools.partial(trader_node, name="Trader")

View File

@ -1,411 +1,419 @@
from langchain_core.messages import BaseMessage, HumanMessage, ToolMessage, AIMessage
from typing import List
from typing import Annotated
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import RemoveMessage
from langchain_core.tools import tool
from datetime import date, timedelta, datetime
import functools
import pandas as pd
import os
from dateutil.relativedelta import relativedelta
from langchain_openai import ChatOpenAI
import tradingagents.dataflows.interface as interface
from tradingagents.default_config import DEFAULT_CONFIG
def create_msg_delete():
def delete_messages(state):
"""To prevent message history from overflowing, regularly clear message history after a stage of the pipeline is done"""
messages = state["messages"]
return {"messages": [RemoveMessage(id=m.id) for m in messages]}
return delete_messages
class Toolkit:
_config = DEFAULT_CONFIG.copy()
@classmethod
def update_config(cls, config):
"""Update the class-level configuration."""
cls._config.update(config)
@property
def config(self):
"""Access the configuration."""
return self._config
def __init__(self, config=None):
if config:
self.update_config(config)
@staticmethod
@tool
def get_reddit_news(
curr_date: Annotated[str, "Date you want to get news for in yyyy-mm-dd format"],
) -> str:
"""
Retrieve global news from Reddit within a specified time frame.
Args:
curr_date (str): Date you want to get news for in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the latest global news from Reddit in the specified time frame.
"""
global_news_result = interface.get_reddit_global_news(curr_date, 7, 5)
return global_news_result
@staticmethod
@tool
def get_finnhub_news(
ticker: Annotated[
str,
"Search query of a company, e.g. 'AAPL, TSM, etc.",
],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news about a given stock from Finnhub within a date range
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing news about the company within the date range from start_date to end_date
"""
end_date_str = end_date
end_date = datetime.strptime(end_date, "%Y-%m-%d")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
look_back_days = (end_date - start_date).days
finnhub_news_result = interface.get_finnhub_news(
ticker, end_date_str, look_back_days
)
return finnhub_news_result
@staticmethod
@tool
def get_reddit_stock_info(
ticker: Annotated[
str,
"Ticker of a company. e.g. AAPL, TSM",
],
curr_date: Annotated[str, "Current date you want to get news for"],
) -> str:
"""
Retrieve the latest news about a given stock from Reddit, given the current date.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): current date in yyyy-mm-dd format to get news for
Returns:
str: A formatted dataframe containing the latest news about the company on the given date
"""
stock_news_results = interface.get_reddit_company_news(ticker, curr_date, 7, 5)
return stock_news_results
@staticmethod
@tool
def get_YFin_data(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "Start date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve the stock price data for a given ticker symbol from Yahoo Finance.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the stock price data for the specified ticker symbol in the specified date range.
"""
result_data = interface.get_YFin_data(symbol, start_date, end_date)
return result_data
@staticmethod
@tool
def get_YFin_data_online(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "Start date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve the stock price data for a given ticker symbol from Yahoo Finance.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the stock price data for the specified ticker symbol in the specified date range.
"""
result_data = interface.get_YFin_data_online(symbol, start_date, end_date)
return result_data
@staticmethod
@tool
def get_stockstats_indicators_report(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[
str, "technical indicator to get the analysis and report of"
],
curr_date: Annotated[
str, "The current trading date you are trading on, YYYY-mm-dd"
],
look_back_days: Annotated[int, "how many days to look back"] = 30,
) -> str:
"""
Retrieve stock stats indicators for a given ticker symbol and indicator.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
indicator (str): Technical indicator to get the analysis and report of
curr_date (str): The current trading date you are trading on, YYYY-mm-dd
look_back_days (int): How many days to look back, default is 30
Returns:
str: A formatted dataframe containing the stock stats indicators for the specified ticker symbol and indicator.
"""
result_stockstats = interface.get_stock_stats_indicators_window(
symbol, indicator, curr_date, look_back_days, False
)
return result_stockstats
@staticmethod
@tool
def get_stockstats_indicators_report_online(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[
str, "technical indicator to get the analysis and report of"
],
curr_date: Annotated[
str, "The current trading date you are trading on, YYYY-mm-dd"
],
look_back_days: Annotated[int, "how many days to look back"] = 30,
) -> str:
"""
Retrieve stock stats indicators for a given ticker symbol and indicator.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
indicator (str): Technical indicator to get the analysis and report of
curr_date (str): The current trading date you are trading on, YYYY-mm-dd
look_back_days (int): How many days to look back, default is 30
Returns:
str: A formatted dataframe containing the stock stats indicators for the specified ticker symbol and indicator.
"""
result_stockstats = interface.get_stock_stats_indicators_window(
symbol, indicator, curr_date, look_back_days, True
)
return result_stockstats
@staticmethod
@tool
def get_finnhub_company_insider_sentiment(
ticker: Annotated[str, "ticker symbol for the company"],
curr_date: Annotated[
str,
"current date of you are trading at, yyyy-mm-dd",
],
):
"""
Retrieve insider sentiment information about a company (retrieved from public SEC information) for the past 30 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the sentiment in the past 30 days starting at curr_date
"""
data_sentiment = interface.get_finnhub_company_insider_sentiment(
ticker, curr_date, 30
)
return data_sentiment
@staticmethod
@tool
def get_finnhub_company_insider_transactions(
ticker: Annotated[str, "ticker symbol"],
curr_date: Annotated[
str,
"current date you are trading at, yyyy-mm-dd",
],
):
"""
Retrieve insider transaction information about a company (retrieved from public SEC information) for the past 30 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's insider transactions/trading information in the past 30 days
"""
data_trans = interface.get_finnhub_company_insider_transactions(
ticker, curr_date, 30
)
return data_trans
@staticmethod
@tool
def get_simfin_balance_sheet(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent balance sheet of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent balance sheet
"""
data_balance_sheet = interface.get_simfin_balance_sheet(ticker, freq, curr_date)
return data_balance_sheet
@staticmethod
@tool
def get_simfin_cashflow(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent cash flow statement of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent cash flow statement
"""
data_cashflow = interface.get_simfin_cashflow(ticker, freq, curr_date)
return data_cashflow
@staticmethod
@tool
def get_simfin_income_stmt(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent income statement of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent income statement
"""
data_income_stmt = interface.get_simfin_income_statements(
ticker, freq, curr_date
)
return data_income_stmt
@staticmethod
@tool
def get_google_news(
query: Annotated[str, "Query to search with"],
curr_date: Annotated[str, "Curr date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news from Google News based on a query and date range.
Args:
query (str): Query to search with
curr_date (str): Current date in yyyy-mm-dd format
look_back_days (int): How many days to look back
Returns:
str: A formatted string containing the latest news from Google News based on the query and date range.
"""
google_news_results = interface.get_google_news(query, curr_date, 7)
return google_news_results
@staticmethod
@tool
def get_stock_news_openai(
ticker: Annotated[str, "the company's ticker"],
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news about a given stock by using OpenAI's news API.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest news about the company on the given date.
"""
openai_news_results = interface.get_stock_news_openai(ticker, curr_date)
return openai_news_results
@staticmethod
@tool
def get_global_news_openai(
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest macroeconomics news on a given date using OpenAI's macroeconomics news API.
Args:
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest macroeconomic news on the given date.
"""
openai_news_results = interface.get_global_news_openai(curr_date)
return openai_news_results
@staticmethod
@tool
def get_fundamentals_openai(
ticker: Annotated[str, "the company's ticker"],
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest fundamental information about a given stock on a given date by using OpenAI's news API.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest fundamental information about the company on the given date.
"""
openai_fundamentals_results = interface.get_fundamentals_openai(
ticker, curr_date
)
return openai_fundamentals_results
from langchain_core.messages import BaseMessage, HumanMessage, ToolMessage, AIMessage
from typing import List
from typing import Annotated
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import RemoveMessage
from langchain_core.tools import tool
from datetime import date, timedelta, datetime
import functools
import pandas as pd
import os
from dateutil.relativedelta import relativedelta
from langchain_openai import ChatOpenAI
import tradingagents.dataflows.interface as interface
from tradingagents.default_config import DEFAULT_CONFIG
from langchain_core.messages import HumanMessage
def create_msg_delete():
def delete_messages(state):
"""Clear messages and add placeholder for Anthropic compatibility"""
messages = state["messages"]
# Remove all messages
removal_operations = [RemoveMessage(id=m.id) for m in messages]
# Add a minimal placeholder message
placeholder = HumanMessage(content="Continue")
return {"messages": removal_operations + [placeholder]}
return delete_messages
class Toolkit:
_config = DEFAULT_CONFIG.copy()
@classmethod
def update_config(cls, config):
"""Update the class-level configuration."""
cls._config.update(config)
@property
def config(self):
"""Access the configuration."""
return self._config
def __init__(self, config=None):
if config:
self.update_config(config)
@staticmethod
@tool
def get_reddit_news(
curr_date: Annotated[str, "Date you want to get news for in yyyy-mm-dd format"],
) -> str:
"""
Retrieve global news from Reddit within a specified time frame.
Args:
curr_date (str): Date you want to get news for in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the latest global news from Reddit in the specified time frame.
"""
global_news_result = interface.get_reddit_global_news(curr_date, 7, 5)
return global_news_result
@staticmethod
@tool
def get_finnhub_news(
ticker: Annotated[
str,
"Search query of a company, e.g. 'AAPL, TSM, etc.",
],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news about a given stock from Finnhub within a date range
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing news about the company within the date range from start_date to end_date
"""
end_date_str = end_date
end_date = datetime.strptime(end_date, "%Y-%m-%d")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
look_back_days = (end_date - start_date).days
finnhub_news_result = interface.get_finnhub_news(
ticker, end_date_str, look_back_days
)
return finnhub_news_result
@staticmethod
@tool
def get_reddit_stock_info(
ticker: Annotated[
str,
"Ticker of a company. e.g. AAPL, TSM",
],
curr_date: Annotated[str, "Current date you want to get news for"],
) -> str:
"""
Retrieve the latest news about a given stock from Reddit, given the current date.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): current date in yyyy-mm-dd format to get news for
Returns:
str: A formatted dataframe containing the latest news about the company on the given date
"""
stock_news_results = interface.get_reddit_company_news(ticker, curr_date, 7, 5)
return stock_news_results
@staticmethod
@tool
def get_YFin_data(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve the stock price data for a given ticker symbol from Yahoo Finance.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the stock price data for the specified ticker symbol in the specified date range.
"""
result_data = interface.get_YFin_data(symbol, start_date, end_date)
return result_data
@staticmethod
@tool
def get_YFin_data_online(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve the stock price data for a given ticker symbol from Yahoo Finance.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the stock price data for the specified ticker symbol in the specified date range.
"""
result_data = interface.get_YFin_data_online(symbol, start_date, end_date)
return result_data
@staticmethod
@tool
def get_stockstats_indicators_report(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[
str, "technical indicator to get the analysis and report of"
],
curr_date: Annotated[
str, "The current trading date you are trading on, YYYY-mm-dd"
],
look_back_days: Annotated[int, "how many days to look back"] = 30,
) -> str:
"""
Retrieve stock stats indicators for a given ticker symbol and indicator.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
indicator (str): Technical indicator to get the analysis and report of
curr_date (str): The current trading date you are trading on, YYYY-mm-dd
look_back_days (int): How many days to look back, default is 30
Returns:
str: A formatted dataframe containing the stock stats indicators for the specified ticker symbol and indicator.
"""
result_stockstats = interface.get_stock_stats_indicators_window(
symbol, indicator, curr_date, look_back_days, False
)
return result_stockstats
@staticmethod
@tool
def get_stockstats_indicators_report_online(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[
str, "technical indicator to get the analysis and report of"
],
curr_date: Annotated[
str, "The current trading date you are trading on, YYYY-mm-dd"
],
look_back_days: Annotated[int, "how many days to look back"] = 30,
) -> str:
"""
Retrieve stock stats indicators for a given ticker symbol and indicator.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
indicator (str): Technical indicator to get the analysis and report of
curr_date (str): The current trading date you are trading on, YYYY-mm-dd
look_back_days (int): How many days to look back, default is 30
Returns:
str: A formatted dataframe containing the stock stats indicators for the specified ticker symbol and indicator.
"""
result_stockstats = interface.get_stock_stats_indicators_window(
symbol, indicator, curr_date, look_back_days, True
)
return result_stockstats
@staticmethod
@tool
def get_finnhub_company_insider_sentiment(
ticker: Annotated[str, "ticker symbol for the company"],
curr_date: Annotated[
str,
"current date of you are trading at, yyyy-mm-dd",
],
):
"""
Retrieve insider sentiment information about a company (retrieved from public SEC information) for the past 30 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the sentiment in the past 30 days starting at curr_date
"""
data_sentiment = interface.get_finnhub_company_insider_sentiment(
ticker, curr_date, 30
)
return data_sentiment
@staticmethod
@tool
def get_finnhub_company_insider_transactions(
ticker: Annotated[str, "ticker symbol"],
curr_date: Annotated[
str,
"current date you are trading at, yyyy-mm-dd",
],
):
"""
Retrieve insider transaction information about a company (retrieved from public SEC information) for the past 30 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's insider transactions/trading information in the past 30 days
"""
data_trans = interface.get_finnhub_company_insider_transactions(
ticker, curr_date, 30
)
return data_trans
@staticmethod
@tool
def get_simfin_balance_sheet(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent balance sheet of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent balance sheet
"""
data_balance_sheet = interface.get_simfin_balance_sheet(ticker, freq, curr_date)
return data_balance_sheet
@staticmethod
@tool
def get_simfin_cashflow(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent cash flow statement of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent cash flow statement
"""
data_cashflow = interface.get_simfin_cashflow(ticker, freq, curr_date)
return data_cashflow
@staticmethod
@tool
def get_simfin_income_stmt(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent income statement of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent income statement
"""
data_income_stmt = interface.get_simfin_income_statements(
ticker, freq, curr_date
)
return data_income_stmt
@staticmethod
@tool
def get_google_news(
query: Annotated[str, "Query to search with"],
curr_date: Annotated[str, "Curr date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news from Google News based on a query and date range.
Args:
query (str): Query to search with
curr_date (str): Current date in yyyy-mm-dd format
look_back_days (int): How many days to look back
Returns:
str: A formatted string containing the latest news from Google News based on the query and date range.
"""
google_news_results = interface.get_google_news(query, curr_date, 7)
return google_news_results
@staticmethod
@tool
def get_stock_news(
ticker: Annotated[str, "the company's ticker"],
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news about a given stock by using LLM's web search capabilities.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest news about the company on the given date.
"""
results = interface.get_stock_news(ticker, curr_date)
return results
@staticmethod
@tool
def get_global_news(
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest macroeconomics news on a given date using LLM's web search capabilities.
Args:
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest macroeconomic news on the given date.
"""
results = interface.get_global_news(curr_date)
return results
@staticmethod
@tool
def get_fundamentals(
ticker: Annotated[str, "the company's ticker"],
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest fundamental information about a given stock on a given date by using LLM's web search capabilities.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest fundamental information about the company on the given date.
"""
results = interface.get_fundamentals(
ticker, curr_date
)
return results

View File

@ -0,0 +1,20 @@
from .embedding_providers import (
EmbeddingProvider,
OpenAIEmbeddingProvider,
GeminiEmbeddingProvider,
OllamaEmbeddingProvider
)
class EmbeddingProviderFactory:
@staticmethod
def create_provider(config : dict[str, any])->EmbeddingProvider:
backend_url = config["backend_url"]
if "generativelanguage.googleapis.com" in backend_url:
return GeminiEmbeddingProvider(backend_url)
elif "localhost:11434" in backend_url:
return OllamaEmbeddingProvider(backend_url)
else:
return OpenAIEmbeddingProvider(backend_url)

View File

@ -0,0 +1,66 @@
from abc import ABC, abstractmethod
from openai import OpenAI
from google import genai
class EmbeddingProvider(ABC):
@abstractmethod
def get_embedding(self, text: str)->list[float]:
pass
@property
@abstractmethod
def model_name(self)->str:
pass
class OpenAIEmbeddingProvider(EmbeddingProvider):
def __init__(self, backend_url: str, embedding_model: str = "text-embedding-3-small"):
self.client = OpenAI(base_url=backend_url)
self._embedding_model = embedding_model
def get_embedding(self, text: str)->list[float]:
response = self.client.embeddings.create(
model=self._embedding_model,
input=text
)
return response.data[0].embedding
@property
def model_name(self)->str:
return self._embedding_model
class GeminiEmbeddingProvider(EmbeddingProvider):
def __init__(self, backend_url: str, embedding_model: str = "gemini-embedding-exp-03-07"):
self.client = genai.Client()
self._embedding_model = embedding_model
def get_embedding(self, text: str)->list[float]:
response = self.client.models.embed_content(
model=self._embedding_model,
contents=text
)
return response.embeddings[0].values
@property
def model_name(self)->str:
return self._embedding_model
class OllamaEmbeddingProvider(EmbeddingProvider):
def __init__(self, backend_url: str, embedding_model: str = "nomic-embed-text"):
self.client = OpenAI(base_url=backend_url)
self._embedding_model = embedding_model
def get_embedding(self, text: str)->list[float]:
response = self.client.embeddings.create(
model=self._embedding_model,
input=text
)
return response.data[0].embedding
@property
def model_name(self)->str:
return self._embedding_model

View File

@ -1,110 +1,112 @@
import chromadb
from chromadb.config import Settings
from openai import OpenAI
import numpy as np
from langchain_openai import OpenAIEmbeddings
import os
class FinancialSituationMemory:
def __init__(self, name):
# self.client = OpenAI()
self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002", api_key=os.getenv("OPENAI_API_KEY"))
self.chroma_client = chromadb.Client(Settings(allow_reset=True))
self.situation_collection = self.chroma_client.get_or_create_collection(name=name)
def get_embedding(self, text):
"""Get OpenAI embedding for a text"""
embedding = self.embeddings.embed_query(text)
return embedding
def add_situations(self, situations_and_advice):
"""Add financial situations and their corresponding advice. Parameter is a list of tuples (situation, rec)"""
situations = []
advice = []
ids = []
embeddings = []
offset = self.situation_collection.count()
for i, (situation, recommendation) in enumerate(situations_and_advice):
situations.append(situation)
advice.append(recommendation)
ids.append(str(offset + i))
embeddings.append(self.get_embedding(situation))
self.situation_collection.add(
documents=situations,
metadatas=[{"recommendation": rec} for rec in advice],
embeddings=embeddings,
ids=ids,
)
def get_memories(self, current_situation, n_matches=1):
"""Find matching recommendations using OpenAI embeddings"""
query_embedding = self.get_embedding(current_situation)
results = self.situation_collection.query(
query_embeddings=[query_embedding],
n_results=n_matches,
include=["metadatas", "documents", "distances"],
)
matched_results = []
for i in range(len(results["documents"][0])):
matched_results.append(
{
"matched_situation": results["documents"][0][i],
"recommendation": results["metadatas"][0][i]["recommendation"],
"similarity_score": 1 - results["distances"][0][i],
}
)
return matched_results
if __name__ == "__main__":
# Example usage
matcher = FinancialSituationMemory()
# Example data
example_data = [
(
"High inflation rate with rising interest rates and declining consumer spending",
"Consider defensive sectors like consumer staples and utilities. Review fixed-income portfolio duration.",
),
(
"Tech sector showing high volatility with increasing institutional selling pressure",
"Reduce exposure to high-growth tech stocks. Look for value opportunities in established tech companies with strong cash flows.",
),
(
"Strong dollar affecting emerging markets with increasing forex volatility",
"Hedge currency exposure in international positions. Consider reducing allocation to emerging market debt.",
),
(
"Market showing signs of sector rotation with rising yields",
"Rebalance portfolio to maintain target allocations. Consider increasing exposure to sectors benefiting from higher rates.",
),
]
# Add the example situations and recommendations
matcher.add_situations(example_data)
# Example query
current_situation = """
Market showing increased volatility in tech sector, with institutional investors
reducing positions and rising interest rates affecting growth stock valuations
"""
try:
recommendations = matcher.get_memories(current_situation, n_matches=2)
for i, rec in enumerate(recommendations, 1):
print(f"\nMatch {i}:")
print(f"Similarity Score: {rec['similarity_score']:.2f}")
print(f"Matched Situation: {rec['matched_situation']}")
print(f"Recommendation: {rec['recommendation']}")
except Exception as e:
print(f"Error during recommendation: {str(e)}")
import chromadb
from chromadb.config import Settings
from openai import OpenAI
import os
from .embedding_provider_factory import EmbeddingProviderFactory
from google import genai
class FinancialSituationMemory:
def __init__(self, name, config):
self.config = config
self.backend_url = config["backend_url"]
self.embedding_provider = EmbeddingProviderFactory.create_provider(config)
self.chroma_client = chromadb.Client(Settings(allow_reset=True))
self.situation_collection = self.chroma_client.create_collection(name=name)
def get_embedding(self, text):
"""Get embedding for a text using the appropriate API"""
return self.embedding_provider.get_embedding(text)
def add_situations(self, situations_and_advice):
"""Add financial situations and their corresponding advice. Parameter is a list of tuples (situation, rec)"""
situations = []
advice = []
ids = []
embeddings = []
offset = self.situation_collection.count()
for i, (situation, recommendation) in enumerate(situations_and_advice):
situations.append(situation)
advice.append(recommendation)
ids.append(str(offset + i))
embeddings.append(self.get_embedding(situation))
self.situation_collection.add(
documents=situations,
metadatas=[{"recommendation": rec} for rec in advice],
embeddings=embeddings,
ids=ids,
)
def get_memories(self, current_situation, n_matches=1):
"""Find matching recommendations using embeddings"""
query_embedding = self.get_embedding(current_situation)
results = self.situation_collection.query(
query_embeddings=[query_embedding],
n_results=n_matches,
include=["metadatas", "documents", "distances"],
)
matched_results = []
for i in range(len(results["documents"][0])):
matched_results.append(
{
"matched_situation": results["documents"][0][i],
"recommendation": results["metadatas"][0][i]["recommendation"],
"similarity_score": 1 - results["distances"][0][i],
}
)
return matched_results
if __name__ == "__main__":
# Example usage
matcher = FinancialSituationMemory()
# Example data
example_data = [
(
"High inflation rate with rising interest rates and declining consumer spending",
"Consider defensive sectors like consumer staples and utilities. Review fixed-income portfolio duration.",
),
(
"Tech sector showing high volatility with increasing institutional selling pressure",
"Reduce exposure to high-growth tech stocks. Look for value opportunities in established tech companies with strong cash flows.",
),
(
"Strong dollar affecting emerging markets with increasing forex volatility",
"Hedge currency exposure in international positions. Consider reducing allocation to emerging market debt.",
),
(
"Market showing signs of sector rotation with rising yields",
"Rebalance portfolio to maintain target allocations. Consider increasing exposure to sectors benefiting from higher rates.",
),
]
# Add the example situations and recommendations
matcher.add_situations(example_data)
# Example query
current_situation = """
Market showing increased volatility in tech sector, with institutional investors
reducing positions and rising interest rates affecting growth stock valuations
"""
try:
recommendations = matcher.get_memories(current_situation, n_matches=2)
for i, rec in enumerate(recommendations, 1):
print(f"\nMatch {i}:")
print(f"Similarity Score: {rec['similarity_score']:.2f}")
print(f"Matched Situation: {rec['matched_situation']}")
print(f"Recommendation: {rec['recommendation']}")
except Exception as e:
print(f"Error during recommendation: {str(e)}")

View File

@ -14,6 +14,7 @@ from tqdm import tqdm
import yfinance as yf
from openai import OpenAI
from .config import get_config, set_config, DATA_DIR
from .search_provider_factory import SearchProviderFactory
def get_finnhub_news(
@ -628,7 +629,7 @@ def get_YFin_data_window(
def get_YFin_data_online(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
):
datetime.strptime(start_date, "%Y-%m-%d")
@ -670,7 +671,7 @@ def get_YFin_data_online(
def get_YFin_data(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
# read in data
data = pd.read_csv(
@ -702,103 +703,25 @@ def get_YFin_data(
return filtered_data
def get_stock_news_openai(ticker, curr_date):
client = OpenAI()
response = client.responses.create(
model="gpt-4.1-mini",
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search Social Media for {ticker} from 7 days before {curr_date} to {curr_date}? Make sure you only get the data posted during that period.",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text
def get_stock_news(ticker, curr_date):
config = get_config()
search_provider = SearchProviderFactory.create_provider(config)
query = f"Can you search Social Media for {ticker} from 7 days before {curr_date} to {curr_date}? Make sure you only get the data posted during that period."
return search_provider.search(query)
def get_global_news_openai(curr_date):
client = OpenAI()
def get_global_news(curr_date):
config = get_config()
search_provider = SearchProviderFactory.create_provider(config)
query = f"Search for global macroeconomic news and financial market updates from 7 days before {curr_date} to {curr_date}. Focus on central bank decisions, economic indicators, geopolitical events, and market-moving news that would be important for trading decisions."
return search_provider.search(query)
response = client.responses.create(
model="gpt-4.1-mini",
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search global or macroeconomics news from 7 days before {curr_date} to {curr_date} that would be informative for trading purposes? Make sure you only get the data posted during that period.",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text
def get_fundamentals_openai(ticker, curr_date):
client = OpenAI()
response = client.responses.create(
model="gpt-4.1-mini",
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search Fundamental for discussions on {ticker} during of the month before {curr_date} to the month of {curr_date}. Make sure you only get the data posted during that period. List as a table, with PE/PS/Cash flow/ etc",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text
def get_fundamentals(ticker, curr_date):
config = get_config()
search_provider = SearchProviderFactory.create_provider(config)
query = f"Search for fundamental analysis data and financial metrics for {ticker} stock from the month before {curr_date} to the month of {curr_date}. Look for earnings reports, financial ratios like PE, PS, cash flow, revenue growth, analyst ratings, and any fundamental analysis discussions. Please present key metrics in a structured format."
return search_provider.search(query)

View File

@ -0,0 +1,76 @@
from google import genai
from google.genai.types import Tool, GenerateContentConfig, GoogleSearch
from openai import OpenAI
from abc import ABC, abstractmethod
class SearchProvider(ABC):
@abstractmethod
def search(self, query: str, ticker: str, curr_date: str) -> str:
pass
class GoogleSearchProvider(SearchProvider):
def __init__(self, model: str):
self.client = genai.Client()
self.model = model
def search(self, query: str) -> str:
google_search_tool = Tool(
google_search=GoogleSearch()
)
response = self.client.models.generate_content(
model=self.model,
contents=query,
config=GenerateContentConfig(
tools=[google_search_tool],
response_modalities=["TEXT"]
)
)
result_text = ""
for part in response.candidates[0].content.parts:
if hasattr(part, 'text'):
result_text += part.text
return result_text
class OpenAISearchProvider(SearchProvider):
def __init__(self, model: str, backend_url: str):
self.client = OpenAI(base_url=backend_url)
self.model = model
def search(self, query: str) -> str:
response = self.client.responses.create(
model=self.model,
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": query
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text

View File

@ -0,0 +1,47 @@
from .search_provider import (
SearchProvider,
GoogleSearchProvider,
OpenAISearchProvider
)
import hashlib
import json
class SearchProviderFactory:
_cache = {} # 클래스 레벨 캐시
@staticmethod
def create_provider(config: dict[str, any]) -> SearchProvider:
"""
Create a SearchProvider with caching to avoid creating new instances.
Uses config hash as cache key for efficient reuse.
"""
# Create cache key from relevant config values
cache_key_data = {
"backend_url": config["backend_url"],
"model": config["quick_think_llm"]
}
cache_key = hashlib.md5(json.dumps(cache_key_data, sort_keys=True).encode()).hexdigest()
# Return cached instance if exists
if cache_key in SearchProviderFactory._cache:
return SearchProviderFactory._cache[cache_key]
# Create new instance
backend_url = config["backend_url"]
model = config["quick_think_llm"]
if "generativelanguage.googleapis.com" in backend_url:
provider = GoogleSearchProvider(model)
else:
provider = OpenAISearchProvider(model, backend_url)
# Cache and return
SearchProviderFactory._cache[cache_key] = provider
return provider
@staticmethod
def clear_cache():
"""Clear the provider cache (useful for testing or config changes)."""
SearchProviderFactory._cache.clear()

View File

@ -1,19 +1,22 @@
import os
DEFAULT_CONFIG = {
"project_dir": os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
"data_dir": "/Users/yluo/Documents/Code/ScAI/FR1-data",
"data_cache_dir": os.path.join(
os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
"dataflows/data_cache",
),
# LLM settings
"deep_think_llm": "o4-mini",
"quick_think_llm": "gpt-4o-mini",
# Debate and discussion settings
"max_debate_rounds": 1,
"max_risk_discuss_rounds": 1,
"max_recur_limit": 100,
# Tool settings
"online_tools": True,
}
import os
DEFAULT_CONFIG = {
"project_dir": os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
"results_dir": os.getenv("TRADINGAGENTS_RESULTS_DIR", "./results"),
"data_dir": "/Users/yluo/Documents/Code/ScAI/FR1-data",
"data_cache_dir": os.path.join(
os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
"dataflows/data_cache",
),
# LLM settings
"llm_provider": "openai",
"deep_think_llm": "o4-mini",
"quick_think_llm": "gpt-4o-mini",
"backend_url": "https://api.openai.com/v1",
# Debate and discussion settings
"max_debate_rounds": 1,
"max_risk_discuss_rounds": 1,
"max_recur_limit": 100,
# Tool settings
"online_tools": True,
}

View File

@ -2,8 +2,6 @@
from typing import Dict, Any
from langchain_openai import ChatOpenAI
import json
import re
class Reflector:
@ -17,6 +15,8 @@ class Reflector:
def _get_reflection_prompt(self) -> str:
"""Get the system prompt for reflection."""
return """
**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)
You are an expert financial analyst tasked with reviewing trading decisions/analysis and providing a comprehensive, step-by-step analysis.
Your goal is to deliver detailed insights into investment decisions and highlight opportunities for improvement, adhering strictly to the following guidelines:
@ -121,59 +121,3 @@ Adhere strictly to these instructions, and ensure your output is detailed, accur
"RISK JUDGE", judge_decision, situation, returns_losses
)
risk_manager_memory.add_situations([(situation, result)])
@staticmethod
def generate_final_report(final_state: dict) -> str:
"""
Generate a final, comprehensive report from the final state, ensuring
all parts are combined into a single, valid JSON object.
"""
final_report_json = {
"company_info": {
"ticker": final_state.get('company_of_interest', 'N/A'),
"analysis_date": final_state.get('trade_date', 'N/A')
},
"reports": {},
"final_decision": {}
}
def extract_json(text: str) -> dict:
"""Extracts a JSON object from a string, even if it's embedded in other text."""
if not isinstance(text, str):
return {} # Return empty dict if not a string
# Find the start and end of the JSON object
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
json_str = match.group(0)
try:
return json.loads(json_str)
except json.JSONDecodeError:
return {"error": "Failed to decode JSON", "original_text": json_str}
return {"error": "No JSON object found", "original_text": text}
# Process each report
report_keys = ['market_report', 'sentiment_report', 'news_report', 'fundamentals_report']
for key in report_keys:
if final_state.get(key):
report_name = key.replace('_report', '')
final_report_json['reports'][report_name] = extract_json(final_state[key])
# Add investment debate summary
if final_state.get('investment_debate_state'):
final_report_json['reports']['investment_debate'] = {
"summary": final_state['investment_debate_state'].get('judge_decision', 'N/A')
}
# Add final plan and decision
if final_state.get('investment_plan'):
final_report_json['final_decision']['investment_plan'] = final_state['investment_plan']
if final_state.get('final_trade_decision'):
# Extract the final proposal (BUY/HOLD/SELL)
proposal_match = re.search(r'FINAL TRANSACTION PROPOSAL:\s*\*{2}(.*?)\*{2}', final_state['final_trade_decision'])
proposal = proposal_match.group(1) if proposal_match else 'N/A'
final_report_json['final_decision']['final_proposal'] = proposal
final_report_json['final_decision']['full_text'] = final_state['final_trade_decision']
return json.dumps(final_report_json, ensure_ascii=False, indent=4)

View File

@ -1,31 +1,31 @@
# TradingAgents/graph/signal_processing.py
from langchain_openai import ChatOpenAI
class SignalProcessor:
"""Processes trading signals to extract actionable decisions."""
def __init__(self, quick_thinking_llm: ChatOpenAI):
"""Initialize with an LLM for processing."""
self.quick_thinking_llm = quick_thinking_llm
def process_signal(self, full_signal: str) -> str:
"""
Process a full trading signal to extract the core decision.
Args:
full_signal: Complete trading signal text
Returns:
Extracted decision (BUY, SELL, or HOLD)
"""
messages = [
(
"system",
"You are an efficient assistant designed to analyze paragraphs or financial reports provided by a group of analysts. Your task is to extract the investment decision: SELL, BUY, or HOLD. Provide only the extracted decision (SELL, BUY, or HOLD) as your output, without adding any additional text or information.",
),
("human", full_signal),
]
return self.quick_thinking_llm.invoke(messages).content
# TradingAgents/graph/signal_processing.py
from langchain_openai import ChatOpenAI
class SignalProcessor:
"""Processes trading signals to extract actionable decisions."""
def __init__(self, quick_thinking_llm: ChatOpenAI):
"""Initialize with an LLM for processing."""
self.quick_thinking_llm = quick_thinking_llm
def process_signal(self, full_signal: str) -> str:
"""
Process a full trading signal to extract the core decision.
Args:
full_signal: Complete trading signal text
Returns:
Extracted decision (BUY, SELL, or HOLD)
"""
messages = [
(
"system",
"**IMPORTANT THING** Respond in Korean(한국어로 대답해주세요)\n\nYou are an efficient assistant designed to analyze paragraphs or financial reports provided by a group of analysts. Your task is to extract the investment decision: SELL, BUY, or HOLD. Provide only the extracted decision (SELL, BUY, or HOLD) as your output, without adding any additional text or information.",
),
("human", full_signal),
]
return self.quick_thinking_llm.invoke(messages).content

View File

@ -5,9 +5,11 @@ from pathlib import Path
import json
from datetime import date
from typing import Dict, Any, Tuple, List, Optional
import asyncio
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.prebuilt import ToolNode
from tradingagents.agents import *
@ -32,20 +34,19 @@ class TradingAgentsGraph:
def __init__(
self,
config: Dict[str, Any] = None,
progress_callback=None,
selected_analysts=["market", "social", "news", "fundamentals"],
debug=False,
config: Dict[str, Any] = None,
):
"""Initialize the trading agents graph and components.
Args:
config: Configuration dictionary. If None, uses default config
progress_callback: Async function to send progress updates
selected_analysts: List of analyst types to include
debug: Whether to run in debug mode
config: Configuration dictionary. If None, uses default config
"""
self.debug = debug
self.config = config or DEFAULT_CONFIG
self.progress_callback = progress_callback
# Update the interface's config
set_config(self.config)
@ -57,18 +58,26 @@ class TradingAgentsGraph:
)
# Initialize LLMs
self.deep_thinking_llm = ChatOpenAI(model=self.config["deep_think_llm"])
self.quick_thinking_llm = ChatOpenAI(
model=self.config["quick_think_llm"], temperature=0.1
)
if self.config["llm_provider"].lower() == "openai" or self.config["llm_provider"] == "ollama" or self.config["llm_provider"] == "openrouter":
self.deep_thinking_llm = ChatOpenAI(model=self.config["deep_think_llm"], base_url=self.config["backend_url"])
self.quick_thinking_llm = ChatOpenAI(model=self.config["quick_think_llm"], base_url=self.config["backend_url"])
elif self.config["llm_provider"].lower() == "anthropic":
self.deep_thinking_llm = ChatAnthropic(model=self.config["deep_think_llm"], base_url=self.config["backend_url"])
self.quick_thinking_llm = ChatAnthropic(model=self.config["quick_think_llm"], base_url=self.config["backend_url"])
elif self.config["llm_provider"].lower() == "google":
self.deep_thinking_llm = ChatGoogleGenerativeAI(model=self.config["deep_think_llm"])
self.quick_thinking_llm = ChatGoogleGenerativeAI(model=self.config["quick_think_llm"])
else:
raise ValueError(f"Unsupported LLM provider: {self.config['llm_provider']}")
self.toolkit = Toolkit(config=self.config)
# Initialize memories
self.bull_memory = FinancialSituationMemory("bull_memory")
self.bear_memory = FinancialSituationMemory("bear_memory")
self.trader_memory = FinancialSituationMemory("trader_memory")
self.invest_judge_memory = FinancialSituationMemory("invest_judge_memory")
self.risk_manager_memory = FinancialSituationMemory("risk_manager_memory")
self.bull_memory = FinancialSituationMemory("bull_memory", self.config)
self.bear_memory = FinancialSituationMemory("bear_memory", self.config)
self.trader_memory = FinancialSituationMemory("trader_memory", self.config)
self.invest_judge_memory = FinancialSituationMemory("invest_judge_memory", self.config)
self.risk_manager_memory = FinancialSituationMemory("risk_manager_memory", self.config)
# Create tool nodes
self.tool_nodes = self._create_tool_nodes()
@ -97,9 +106,8 @@ class TradingAgentsGraph:
self.ticker = None
self.log_states_dict = {} # date to full state dict
# Set up the graph with default analysts initially
default_analysts = ["market", "social", "news", "fundamentals"]
self.graph = self.graph_setup.setup_graph(default_analysts)
# Set up the graph
self.graph = self.graph_setup.setup_graph(selected_analysts)
def _create_tool_nodes(self) -> Dict[str, ToolNode]:
"""Create tool nodes for different data sources."""
@ -117,7 +125,7 @@ class TradingAgentsGraph:
"social": ToolNode(
[
# online tools
self.toolkit.get_stock_news_openai,
self.toolkit.get_stock_news,
# offline tools
self.toolkit.get_reddit_stock_info,
]
@ -125,7 +133,7 @@ class TradingAgentsGraph:
"news": ToolNode(
[
# online tools
self.toolkit.get_global_news_openai,
self.toolkit.get_global_news,
self.toolkit.get_google_news,
# offline tools
self.toolkit.get_finnhub_news,
@ -135,7 +143,7 @@ class TradingAgentsGraph:
"fundamentals": ToolNode(
[
# online tools
self.toolkit.get_fundamentals_openai,
self.toolkit.get_fundamentals,
# offline tools
self.toolkit.get_finnhub_company_insider_sentiment,
self.toolkit.get_finnhub_company_insider_transactions,
@ -146,55 +154,8 @@ class TradingAgentsGraph:
),
}
def invoke(self, input_data: Dict) -> Dict:
"""Run the trading agents graph for a web-based request."""
self.ticker = input_data.get("ticker", "UNKNOWN")
trade_date = input_data.get("date", date.today().strftime("%Y-%m-%d"))
selected_analysts = input_data.get("selected_analysts", [])
self.graph = self.graph_setup.setup_graph(selected_analysts)
init_agent_state = self.propagator.create_initial_state(
self.ticker, trade_date
)
args = self.propagator.get_graph_args()
final_report = ""
final_state_result = None
# 진행률 계산을 위한 변수
total_steps = len(self.graph.nodes)
step_count = 0
# Stream the graph execution to get real-time updates
for chunk in self.graph.stream(init_agent_state, **args):
# 1 청크당 1단계로 간주
step_count += 1
for node_name, node_output in chunk.items():
if self.progress_callback:
agent_name = node_name.replace("_node", "").replace("_", " ").title()
message = f"Step {step_count}/{total_steps}: {agent_name} is working..."
# 계산된 진행률과 함께 콜백 호출
asyncio.run(self.progress_callback(
"agent_update",
message,
agent_name,
step=step_count,
total=total_steps
))
final_state_result = chunk
if final_state_result:
final_report = self.reflector.generate_final_report(final_state_result)
self._log_state(trade_date, final_state_result)
return {"final_report": final_report}
def propagate(self, company_name, trade_date):
"""Run the trading agents graph for a company on a specific date (CLI)."""
"""Run the trading agents graph for a company on a specific date."""
self.ticker = company_name
@ -209,10 +170,20 @@ class TradingAgentsGraph:
trace = []
for chunk in self.graph.stream(init_agent_state, **args):
if len(chunk["messages"]) == 0:
pass
else:
chunk["messages"][-1].pretty_print()
trace.append(chunk)
continue
message = chunk["messages"][-1]
if message.content and message.content.strip():
if "FINAL TRANSACTION PROPOSAL:" in message.content:
if not hasattr(self, '_final_printed'):
message.pretty_print()
self._final_printed = True
else:
message.pretty_print()
trace.append(chunk)
final_state = trace[-1]
else:

5405
uv.lock Normal file

File diff suppressed because it is too large Load Diff