Add local Codex provider support

This commit is contained in:
nornen0202 2026-04-05 07:06:11 +09:00
parent 10c136f49c
commit 50865711e3
19 changed files with 3061 additions and 7 deletions

View File

@ -0,0 +1,163 @@
# Codex 작업 프롬프트 모음
## 프롬프트 1 — 구현 메인 프롬프트
You are working inside the local TradingAgents repository.
Goal:
Implement a new LLM provider named `codex` so TradingAgents can use the local Codex CLI/app-server authenticated with ChatGPT/Codex login instead of an OpenAI API key.
High-level constraints:
1. Do NOT build an OpenAI-compatible HTTP proxy.
2. Do NOT call raw OAuth endpoints yourself.
3. Do NOT depend on Codex dynamicTools for TradingAgents tool execution.
4. Keep TradingAgents existing LangGraph / ToolNode flow intact.
5. The integration must work for both:
- analyst nodes that use `prompt | llm.bind_tools(tools)`
- non-tool nodes that call `llm.invoke(...)` directly
6. Prefer minimal, coherent changes over broad refactors.
7. Add tests and documentation.
8. No unrelated cleanup.
Architecture to implement:
- Add a new provider `codex` in `tradingagents/llm_clients/factory.py`.
- Add a `CodexClient` implementing the existing BaseLLMClient contract.
- Add a custom LangChain chat model that talks to `codex app-server` over stdio JSONL.
- Reuse a long-lived app-server process per model instance, but create a fresh Codex thread per model invocation to avoid context bleed across agents.
- After each invocation, `thread/unsubscribe`.
- Use `initialize` / `initialized` on startup.
- Add a preflight helper that checks:
- `codex` binary exists
- app-server starts
- `account/read` succeeds
- requested models are available from `model/list`
- Do not require API keys for the `codex` provider.
Authentication assumptions:
- The supported user path is `codex login` or `codex login --device-auth`.
- If file-backed auth is used, Codex-managed credentials may be stored in `~/.codex/auth.json`.
- Do not implement direct OAuth token refresh.
- If auth is missing, fail with a clear actionable message telling the user to run `codex login`.
Important implementation choice:
Do NOT use app-server dynamic tools.
Instead, emulate tool calling at the model boundary with strict structured output:
- For plain non-tool calls, request JSON schema: `{ "answer": string }`
- For tool-capable calls, request a root `oneOf` schema:
- final:
`{ "mode": "final", "content": string, "tool_calls": [] }`
- tool batch:
`{ "mode": "tool_calls", "content": string, "tool_calls": [ ... ] }`
- For `tool_calls[].items`, use `oneOf` with one branch per tool so each tool name has its own exact arguments JSON schema.
- This is required so TradingAgents ToolNode can execute the selected tool calls after receiving an `AIMessage.tool_calls`.
Files to add:
- `tradingagents/llm_clients/codex_client.py`
- `tradingagents/llm_clients/codex_chat_model.py`
- `tradingagents/llm_clients/codex_app_server.py`
- `tradingagents/llm_clients/codex_schema.py`
- `tradingagents/llm_clients/codex_message_codec.py`
- `tradingagents/llm_clients/codex_preflight.py`
Files to modify:
- `tradingagents/llm_clients/factory.py`
- `tradingagents/default_config.py`
- `tradingagents/llm_clients/__init__.py`
- CLI / UI config surfaces if present
- README and/or docs
Model behavior requirements:
- Normalize input from:
- `str`
- LangChain `BaseMessage` sequences
- OpenAI-style dict message sequences
- The custom model must support `bind_tools()`.
- `bind_tools()` should preserve LangChain semantics by binding tool schemas into `_generate(...)`.
- Return `AIMessage` objects.
- If tool calls are requested, populate `AIMessage.tool_calls` with stable ids like `call_<uuid>`.
Safety / hardening requirements:
- Default to a neutral dedicated workspace directory for Codex, not the repo root.
- Add config knobs for:
- `codex_binary`
- `codex_reasoning_effort`
- `codex_summary`
- `codex_personality`
- `codex_workspace_dir`
- `codex_request_timeout`
- `codex_max_retries`
- `codex_cleanup_threads`
- Document a recommended `.codex/config.toml` with:
- `approval_policy = "never"`
- `sandbox_mode = "read-only"`
- `web_search = "disabled"`
- `personality = "none"`
- `cli_auth_credentials_store = "file"`
Testing requirements:
1. Unit tests for message normalization.
2. Unit tests for output schema construction.
3. Unit tests for plain final response parsing.
4. Unit tests for tool-call response parsing.
5. Unit tests for malformed JSON retry / error reporting.
6. Integration smoke test for provider `codex`.
7. Preflight test for missing auth / missing binary.
Acceptance criteria:
- `llm_provider="codex"` works without API keys after `codex login`.
- At least one analyst node using `bind_tools()` works.
- At least one non-tool node using `llm.invoke(...)` works.
- A minimal smoke run can produce a final report / final decision.
- Documentation explains installation, auth, usage, and limitations.
Implementation style:
- Read the existing code first and align with project style.
- Make the smallest set of clean, composable changes.
- Include comments only where they add real value.
- Avoid speculative abstractions.
- Keep the code production-oriented and debuggable.
Working method:
1. Inspect the current LLM client factory and how agents call `bind_tools()` vs `invoke()`.
2. Implement the connection layer.
3. Implement the chat model.
4. Wire the provider.
5. Add preflight + docs.
6. Add tests.
7. Run the relevant tests / smoke checks.
8. Summarize exactly what changed and any limitations that remain.
Do the work now.
---
## 프롬프트 2 — 검증/수정 프롬프트
Review the `codex` provider implementation you just added to TradingAgents.
Your job:
1. Find correctness bugs, interface mismatches, race conditions, and integration gaps.
2. Pay special attention to:
- LangChain `bind_tools()` semantics
- `AIMessage.tool_calls` structure
- support for `llm.invoke(str)`, `llm.invoke(list[BaseMessage])`, and `llm.invoke(list[dict])`
- app-server request/response matching
- thread cleanup with `thread/unsubscribe`
- malformed JSON retries
- missing auth / missing binary / missing model diagnostics
3. Run or update tests as needed.
4. Fix only what is necessary; do not refactor unrelated code.
5. Update docs if behavior changed.
Definition of done:
- the provider is internally consistent,
- tests pass,
- smoke run works,
- error messages are actionable,
- no obvious context-bleed or tool-calling contract issues remain.
Return:
- a concise changelog,
- exact files modified,
- exact commands/tests run,
- any remaining known limitations.

View File

@ -0,0 +1,750 @@
# TradingAgents × Codex 브리지 구현 설계서
## 1. 목표
TradingAgents가 요구하는 LLM 호출을 OpenAI API key 대신 **로컬 Codex app-server + ChatGPT/Codex 인증**으로 처리한다.
핵심 목표는 다음과 같다.
1. TradingAgents의 **기존 LangGraph / ToolNode 구조를 유지**한다.
2. OpenAI 호환 프록시를 억지로 에뮬레이션하지 않고, **새 provider(`codex`)를 추가**한다.
3. ChatGPT Pro 사용자는 **Codex 로그인(`codex login` / `codex login --device-auth`) 또는 Codex의 managed auth cache(`~/.codex/auth.json`)**를 통해 인증한다.
4. `bind_tools()`가 필요한 analyst 노드와 plain `invoke()`만 필요한 debate / manager / trader 노드가 모두 동작해야 한다.
5. Codex의 자체 shell/web/tool 생태계에 의존하지 않고, TradingAgents가 이미 가진 도구 실행 루프를 그대로 사용한다.
---
## 2. 왜 이 방식이 최선인가
### 채택할 방식
**권장안:** `codex app-server`를 로컬에 띄우고, Python에서 stdio(JSONL)로 통신하는 **Custom LangChain ChatModel**을 만든다.
### 채택하지 않을 방식
#### A. OpenAI-compatible `/v1/responses` 프록시
비추천. TradingAgents는 현재 `openai` provider에서 `langchain_openai.ChatOpenAI`를 사용하고 native OpenAI일 때 `use_responses_api=True`를 켠다.
`/v1/responses`와 tool-calling semantics를 꽤 정확히 흉내 내야 한다. 구현 난도가 높고 유지보수 비용이 크다.
#### B. Codex dynamic tools 직접 사용
비추천. app-server의 `dynamicTools``item/tool/call`**experimental** 이다.
TradingAgents는 이미 `ToolNode`로 툴 실행을 잘 처리하므로, 여기까지 Codex에 넘길 이유가 없다.
#### C. Codex SDK 직접 내장
부분적으로 가능하지만 비권장. SDK는 TypeScript 중심이다. Python 프로젝트인 TradingAgents에선 app-server stdio 브리지가 더 단순하다.
### 설계 핵심
Codex는 **모델 추론만 담당**하고, 실제 툴 실행은 여전히 TradingAgents/LangGraph가 담당한다.
따라서 Codex 쪽에는 tool schema를 설명하고, 응답은 **엄격한 JSON schema**로만 받는다.
- 툴이 필요하면: `{"mode":"tool_calls", ...}`
- 툴이 더 이상 필요 없으면: `{"mode":"final", ...}`
이렇게 하면 analyst 노드의 `bind_tools()` 요구사항을 만족시키면서도 Codex의 experimental dynamic tool API를 피할 수 있다.
---
## 3. 구현 아키텍처
## 3.1 새 provider 추가
### 수정 파일
- `tradingagents/llm_clients/factory.py`
- `tradingagents/default_config.py`
- `tradingagents/llm_clients/__init__.py`
- CLI/UI 관련 파일(선택 사항이 아니라 사실상 권장)
### 추가 파일
- `tradingagents/llm_clients/codex_client.py`
- `tradingagents/llm_clients/codex_chat_model.py`
- `tradingagents/llm_clients/codex_app_server.py`
- `tradingagents/llm_clients/codex_schema.py`
- `tradingagents/llm_clients/codex_message_codec.py`
- `tradingagents/llm_clients/codex_preflight.py`
- `tests/llm_clients/test_codex_chat_model.py`
- `tests/llm_clients/test_codex_app_server.py`
- `tests/integration/test_codex_provider_smoke.py`
---
## 3.2 런타임 구성
### TradingAgents 측
`TradingAgentsGraph.__init__()`는 deep/quick 두 개 LLM을 한 번 생성해 재사용한다.
따라서 `CodexChatModel`도 **모델 인스턴스당 app-server 세션 1개**를 유지하는 것이 적절하다.
- quick_thinking_llm → Codex app-server session A
- deep_thinking_llm → Codex app-server session B
### 중요 원칙
- **세션은 재사용**
- **thread는 per-invoke 새로 생성**
- 이유: 여러 analyst / debate agent가 같은 LLM 인스턴스를 공유하므로 thread까지 재사용하면 문맥 오염이 발생한다.
즉:
- app-server process: 재사용
- Codex thread: 매 호출마다 새로 생성 후 `thread/unsubscribe`
---
## 3.3 인증 전략
### 기본/권장
사용자가 먼저 로컬에서:
```bash
codex login
```
브라우저 callback이 막히거나 headless면:
```bash
codex login --device-auth
```
### headless / container / 원격 머신
- `cli_auth_credentials_store = "file"` 로 설정해서 `~/.codex/auth.json`을 사용
- 신뢰 가능한 머신에서 생성한 `auth.json`을 복사
- refresh는 직접 구현하지 말고 Codex가 하게 둔다
- `auth.json`은 절대 커밋 금지
### 고급 옵션: OAuth URL helper
원한다면 Python helper에서 app-server로 아래를 호출해 브라우저 login URL을 직접 받아 출력할 수 있다.
- `account/read`
- `account/login/start` with `type="chatgpt"`
하지만 **v1 구현은 이 helper 없이도 충분**하다. 실제 운영에서는 `codex login`이 더 단순하고 안정적이다.
---
## 3.4 보안 / 하드닝
Codex를 “코딩 에이전트”가 아니라 “모델 백엔드”로만 쓰기 위해 다음을 권장한다.
### `.codex/config.toml` 예시
```toml
model = "gpt-5.4"
model_reasoning_effort = "medium"
approval_policy = "never"
sandbox_mode = "read-only"
web_search = "disabled"
personality = "none"
log_dir = ".codex-log"
cli_auth_credentials_store = "file"
```
### 선택적 하드닝
```toml
[features]
apps = false
shell_tool = false
multi_agent = false
```
### 추가 권장
`cwd`를 프로젝트 루트가 아니라 **비어 있는 전용 workspace**로 준다.
예:
- `~/.cache/tradingagents/codex_workspace`
- 또는 repo 내 `./.tradingagents_codex_workspace`
이렇게 해야 Codex가 리포지토리를 뒤지거나 파일을 읽는 쪽으로 샐 가능성을 낮출 수 있다.
---
## 4. 메시지/툴 호출 설계
## 4.1 입력 정규화
`CodexChatModel`은 아래 입력을 모두 받아야 한다.
1. `str`
2. `list[BaseMessage]`
3. `list[dict(role=..., content=...)]`
이유:
- analyst 체인은 prompt pipeline 때문에 `BaseMessage` 시퀀스를 넘길 가능성이 높다
- trader / manager 쪽은 OpenAI-style dict list를 직접 `llm.invoke(messages)`로 넘긴다
### 내부 정규화 포맷 예시
```text
[SYSTEM]
...
[USER]
...
[ASSISTANT]
...
[ASSISTANT_TOOL_CALL]
name=get_news
args={"query":"AAPL",...}
[TOOL_RESULT]
name=get_news
call_id=call_xxx
content=...
```
---
## 4.2 bind_tools 처리
TradingAgents analyst 노드는 다음 패턴을 사용한다.
```python
chain = prompt | llm.bind_tools(tools)
result = chain.invoke(state["messages"])
```
따라서 `CodexChatModel.bind_tools()`는 반드시 구현해야 한다.
### 구현 방식
- LangChain tool 객체를 OpenAI-style tool schema로 변환
- 내부적으로 `self.bind(tools=formatted_tools, tool_choice=...)` 형태로 바인딩
- `_generate(..., tools=..., tool_choice=...)`에서 그 schema를 읽어 사용
### tool schema 변환
가능한 한 LangChain의 표준 helper(`convert_to_openai_tool` 계열)를 사용한다.
각 tool에 대해 다음 정보를 확보한다.
- `name`
- `description`
- `parameters` JSON schema
---
## 4.3 output schema 설계
### plain invoke용
```json
{
"type": "object",
"properties": {
"answer": { "type": "string" }
},
"required": ["answer"],
"additionalProperties": false
}
```
### tool-capable invoke용
루트는 **final** 또는 **tool_calls** 중 하나가 되도록 강제한다.
```json
{
"oneOf": [
{
"type": "object",
"properties": {
"mode": { "const": "final" },
"content": { "type": "string" },
"tool_calls": {
"type": "array",
"maxItems": 0
}
},
"required": ["mode", "content", "tool_calls"],
"additionalProperties": false
},
{
"type": "object",
"properties": {
"mode": { "const": "tool_calls" },
"content": { "type": "string" },
"tool_calls": {
"type": "array",
"minItems": 1,
"items": {
"oneOf": [
{
"type": "object",
"properties": {
"name": { "const": "get_news" },
"arguments": { "...": "get_news parameters schema" }
},
"required": ["name", "arguments"],
"additionalProperties": false
}
]
}
}
},
"required": ["mode", "content", "tool_calls"],
"additionalProperties": false
}
]
}
```
### 중요한 포인트
`tool_calls.items.oneOf` 안에 **툴별 arguments schema**를 넣는다.
그래야 Codex가 tool 이름과 인자를 아무렇게나 생성하지 못한다.
---
## 4.4 tool-call 정책
Codex에게 항상 다음 규칙을 준다.
1. 지금 당장 필요한 **다음 단계 툴 호출만** 요청할 것
2. speculative call 금지
3. tool result를 아직 보지 않은 상태에서 downstream tool을 미리 호출하지 말 것
4. 툴이 필요 없으면 final로 답할 것
5. 응답은 output schema에 맞는 JSON만 낼 것
### 왜 필요한가
예를 들어 market analyst는 `get_stock_data` 이후에 `get_indicators`가 자연스럽다.
하지만 CSV 생성/캐시 같은 간접 의존성이 있으므로 한 번에 여러 단계를 추측 호출하게 두는 것보다 **최소 다음 호출만** 받는 편이 안전하다.
---
## 5. Codex app-server 통신 계층 설계
## 5.1 `CodexAppServerConnection`
책임:
- `codex app-server` subprocess 시작/종료
- `initialize` / `initialized`
- request/response correlation (`id`)
- stdout JSONL reader thread
- notifications 수집
- timeout / error propagation
- graceful shutdown
### 핵심 메서드
- `start()`
- `close()`
- `request(method, params, timeout)`
- `wait_for_turn_completion(thread_id, turn_id, timeout)`
- `read_account()`
- `read_models()`
- `read_rate_limits()`
### transport
- **stdio(JSONL)** 사용
- websocket transport는 실익이 적으므로 v1에서 제외
---
## 5.2 초기 handshake
시작 직후:
1. subprocess spawn: `codex app-server`
2. `initialize`
3. `initialized`
4. `account/read`
5. 필요 시 `model/list`
### `initialize` 예시
```json
{
"method": "initialize",
"id": 1,
"params": {
"clientInfo": {
"name": "tradingagents_codex_bridge",
"title": "TradingAgents Codex Bridge",
"version": "0.1.0"
}
}
}
```
---
## 5.3 preflight 체크
`codex_preflight.py` 또는 helper 함수에서:
1. `codex` binary 존재 여부 확인
2. app-server 시작 가능 여부 확인
3. `account/read(refreshToken=false)` 실행
4. `account.type == "chatgpt"` 또는 `"apiKey"`인지 확인
5. 가능하면 `planType == "pro"` 확인
6. `model/list`에서 `deep_think_llm`, `quick_think_llm` 가용성 확인
7. `account/rateLimits/read` 가능하면 출력
### 실패 시 메시지 예시
- `Codex not installed. Install with npm i -g @openai/codex`
- `No ChatGPT/API auth found. Run codex login`
- `Requested model gpt-5.4-mini is not available under current Codex account`
---
## 6. LangChain 커스텀 모델 설계
## 6.1 `CodexChatModel`
상속:
- `langchain_core.language_models.chat_models.BaseChatModel`
필수 구현:
- `_generate(...)`
- `_llm_type`
- `bind_tools(...)`
권장 추가:
- `_identifying_params`
- `invoke(...)` 입력 정규화 보강
- 에러 래핑
### 내부 필드 예시
- `model`
- `reasoning_effort`
- `summary`
- `personality`
- `request_timeout`
- `max_retries`
- `server: CodexAppServerConnection`
- `workspace_dir`
- `cleanup_threads`
- `service_name`
---
## 6.2 `_generate()` 동작
### tools 없는 경우
1. 입력 messages 정규화
2. plain schema 생성 (`answer`)
3. thread/start
4. turn/start with `outputSchema`
5. 최종 agent message JSON 파싱
6. `AIMessage(content=answer)` 반환
### tools 있는 경우
1. 입력 messages 정규화
2. tool schema 생성
3. root oneOf output schema 생성
4. thread/start
5. turn/start with `outputSchema`
6. 최종 agent message JSON 파싱
7. `mode == "tool_calls"` 면:
- 각 call에 `id = "call_" + uuid`
- `AIMessage(content=content or "", tool_calls=[...])`
8. `mode == "final"` 면:
- `AIMessage(content=content, tool_calls=[])`
### 종료 처리
- `thread/unsubscribe`
- reader queue cleanup
- 필요 시 thread archive는 선택 옵션
---
## 6.3 app-server 호출 파라미터
### thread/start
```json
{
"method": "thread/start",
"params": {
"model": "gpt-5.4",
"cwd": "/abs/path/to/.tradingagents_codex_workspace",
"approvalPolicy": "never",
"serviceName": "tradingagents_codex_bridge"
}
}
```
### turn/start
```json
{
"method": "turn/start",
"params": {
"threadId": "...",
"input": [
{ "type": "text", "text": "<serialized prompt>" }
],
"model": "gpt-5.4",
"effort": "medium",
"summary": "concise",
"personality": "none",
"sandboxPolicy": {
"type": "readOnly",
"access": { "type": "fullAccess" }
},
"outputSchema": { ... }
}
}
```
---
## 6.4 프롬프트 래퍼 템플릿
### plain invoke wrapper
```text
You are the language model backend for a LangGraph-based financial multi-agent system.
Rules:
1. Answer only from the provided conversation transcript.
2. Do not inspect files.
3. Do not run commands.
4. Do not use web search.
5. Return ONLY JSON that matches the provided schema.
Conversation transcript:
<...serialized messages...>
```
### tool-capable wrapper
```text
You are the language model backend for a LangGraph-based financial multi-agent system.
You may either:
- request the next necessary tool call(s), or
- provide the final assistant response.
Hard rules:
1. Use only the allowed tools listed below.
2. Arguments must conform exactly to the JSON schema for that tool.
3. Request only the next required tool call batch.
4. Do not speculate past missing tool results.
5. Do not inspect files.
6. Do not run commands.
7. Do not use web search.
8. Return ONLY JSON that matches the provided schema.
Allowed tools:
<tool schemas pretty-printed>
Conversation transcript:
<...serialized messages...>
```
### 안정화 팁
- tool schema를 pretty JSON으로 포함
- 1~2개의 few-shot example을 포함할 수 있음
- 단, prompt를 너무 길게 만들어 토큰 낭비하지 않도록 주의
---
## 7. TradingAgents 코드 변경 체크리스트
## 7.1 `default_config.py`
추가 권장 key:
```python
"llm_provider": "openai",
"codex_binary": "codex",
"codex_reasoning_effort": "medium",
"codex_summary": "concise",
"codex_personality": "none",
"codex_workspace_dir": os.getenv("TRADINGAGENTS_CODEX_WORKSPACE", "./.tradingagents_codex_workspace"),
"codex_request_timeout": 120,
"codex_max_retries": 2,
"codex_cleanup_threads": True,
```
호환성 위해:
- `openai_reasoning_effort`가 설정돼 있고 `codex_reasoning_effort`가 비어 있으면 fallback 하도록 해도 좋다.
---
## 7.2 `factory.py`
대략:
```python
if provider_lower == "codex":
return CodexClient(model, base_url, **kwargs)
```
---
## 7.3 `codex_client.py`
책임:
- `BaseLLMClient` 구현
- kwargs를 `CodexChatModel` 생성자에 전달
- `validate_model()`에서 preflight/model list 확인
---
## 7.4 CLI / UI
반드시 추가할 항목:
- provider 목록에 `codex`
- backend_url 입력은 codex일 때 숨기거나 무시
- advanced options:
- `codex_reasoning_effort`
- `codex_summary`
- `codex_personality`
- `codex_workspace_dir`
---
## 7.5 README / docs
반드시 문서화:
1. ChatGPT Pro/Codex auth와 API key의 차이
2. `codex login`
3. headless auth cache 사용법
4. `.codex/config.toml` 예시
5. provider 선택 방법
6. known limitations
---
## 8. 테스트 전략
## 8.1 단위 테스트
### `test_codex_message_codec.py`
- `str` 입력 정규화
- `BaseMessage` 시퀀스 정규화
- dict message 시퀀스 정규화
- `ToolMessage` 직렬화
### `test_codex_schema.py`
- plain schema 생성
- tool oneOf schema 생성
- tool args const / required / additionalProperties 검증
### `test_codex_chat_model.py`
mock app-server 응답으로:
- plain final answer
- tool_calls answer
- malformed JSON retry
- timeout
- unsupported model error
### `test_codex_app_server.py`
- initialize handshake
- request/response correlation
- notification draining
- turn completed / failed 처리
---
## 8.2 통합 테스트
### smoke
- provider=`codex`
- analyst=`news` 한 개만 선택
- ticker=`AAPL`
- research depth=1
- 최종 리포트 파일 생성 확인
### tool loop
- market analyst만 실행
- 첫 응답이 `get_stock_data` tool call
- tool result 후 다음 응답이 `get_indicators` 또는 final
### multi-agent
- `market + news`
- graph 전체 완주
- `final_trade_decision` 비어 있지 않음
### auth preflight
- 로그인 안 된 환경 → 친절한 실패
- 로그인 된 환경 → account/read 성공
---
## 8.3 운영 검증
실제 실행 전 아래 순서 권장:
```bash
codex login
python -m tradingagents.llm_clients.codex_preflight
python main.py
```
또는 CLI/UI에서 provider를 `codex`로 선택.
---
## 9. 장애 대응
## 9.1 malformed JSON
대응:
- 1회 재시도
- 재시도 prompt:
- “Your previous output was invalid JSON. Return valid JSON matching the schema only.”
- 그래도 실패하면 예외 raise
## 9.2 app-server 시작 실패
대응:
- binary path 재확인
- `codex --version` 확인
- PATH 문제면 `codex_binary` 절대경로 사용
## 9.3 로그인/권한 문제
대응:
- `codex login`
- headless면 `codex login --device-auth`
- `cli_auth_credentials_store="file"` 설정
- `~/.codex/auth.json` 존재 여부 확인
## 9.4 rate limit
대응:
- `account/rateLimits/read` 노출
- 재시도(backoff)
- 긴 배치 작업은 serialized run
- 필요 시 Codex credits 사용 고려
## 9.5 thread log 과다 생성
대응:
- `thread/unsubscribe` 기본 수행
- `.codex-log` 별도 디렉터리 사용
- 오래된 로그 cleanup script 추가
---
## 10. 권장 구현 순서
### Phase 1
- provider 추가
- app-server connection 추가
- plain invoke만 먼저 연결
- preflight 추가
### Phase 2
- `bind_tools()` + tool schema oneOf 구현
- analyst nodes smoke test
### Phase 3
- CLI/UI 옵션 추가
- README/docs 작성
- 통합 테스트 보강
### Phase 4
- malformed JSON retry
- rate limit/backoff
- log cleanup / diagnostics
---
## 11. 최종 권장안 요약
### 가장 좋은 구현 방식
**TradingAgents에 `codex` provider를 새로 추가하고, 내부에서 `codex app-server`와 stdio(JSONL)로 통신하는 LangChain 커스텀 ChatModel을 구현한다.**
tool calling은 Codex dynamicTools를 쓰지 말고, **outputSchema + JSON oneOf** 방식으로 모델 응답을 `final` 또는 `tool_calls` 형태로 강제한다.
### 이 방식의 장점
- OpenAI API key 불필요
- ChatGPT Pro / Codex 로그인 재사용 가능
- TradingAgents의 기존 ToolNode / graph 구조 유지
- Python 프로젝트에 자연스럽게 통합 가능
- dynamicTools 실험 API 의존 최소화
- 추후 유지보수 포인트가 명확함
### 반드시 지켜야 할 운영 원칙
- 직접 OAuth refresh 구현 금지
- `auth.json`은 비밀 취급
- `codex login` 또는 device-auth 우선
- one auth cache per trusted runner / serialized workflow
- Codex를 모델 백엔드로만 쓰고 shell/web 기능은 최대한 비활성화
---
## 12. 최소 수용 기준(Acceptance Criteria)
아래가 모두 충족되면 구현 성공으로 간주한다.
1. `llm_provider="codex"` 설정으로 TradingAgents가 실행된다.
2. API key 없이 `codex login` 상태에서 동작한다.
3. analyst 노드가 `bind_tools()`를 통해 tool call을 생성하고 ToolNode가 이를 실행한다.
4. manager/trader/risk nodes가 plain `invoke()`로 정상 응답한다.
5. `AAPL` 또는 `SPY`에 대해 최소 1개 analyst + 전체 graph smoke run이 성공한다.
6. malformed JSON, auth missing, binary missing, model missing에 대한 에러 메시지가 명확하다.
7. README와 preflight가 포함된다.

213
README.ko.md Normal file
View File

@ -0,0 +1,213 @@
# TradingAgents: 멀티 에이전트 LLM 금융 트레이딩 프레임워크
영문 문서: [README.md](README.md)
## 개요
TradingAgents는 실제 트레이딩 조직의 협업 흐름을 반영한 멀티 에이전트 프레임워크입니다. 펀더멘털 분석가, 뉴스 분석가, 시장 분석가, 리서처, 트레이더, 리스크 관리 팀이 역할별로 나뉘어 시장을 분석하고, 토론을 거쳐 최종 매매 결정을 도출합니다.
이 프로젝트는 연구 목적입니다. 결과는 사용한 모델, 데이터 품질, 분석 기간, 프롬프트, 외부 환경에 따라 달라질 수 있으며 투자 자문 용도가 아닙니다.
## 팀 구성
### 애널리스트 팀
- 펀더멘털 분석가: 기업 재무 상태와 성과 지표를 평가합니다.
- 센티먼트 분석가: 소셜 미디어와 대중 심리를 분석합니다.
- 뉴스 분석가: 뉴스와 거시경제 이벤트의 영향을 해석합니다.
- 시장 분석가: 기술적 지표와 가격 흐름을 분석합니다.
### 리서처 팀
- 강세 관점과 약세 관점의 리서처가 애널리스트 보고서를 바탕으로 토론합니다.
### 트레이더
- 애널리스트와 리서처의 결과를 종합해 매매 타이밍과 비중을 판단합니다.
### 리스크 관리 및 포트폴리오 매니저
- 리스크를 평가하고 최종 거래 제안을 승인하거나 거절합니다.
## 설치
### 저장소 클론
```powershell
git clone https://github.com/TauricResearch/TradingAgents.git
Set-Location TradingAgents
```
### Windows PowerShell 빠른 시작
이 저장소에서 실제로 검증한 설치 절차입니다.
```powershell
Set-Location C:\Projects\TradingAgents
py -3.13 -m venv .venv-codex
.\.venv-codex\Scripts\Activate.ps1
python -m pip install --upgrade pip
python -m pip install -e . --no-cache-dir
tradingagents --help
```
참고:
- 현재 환경에서는 `.venv-codex`를 기본 가상환경으로 사용하는 흐름을 검증했습니다.
- `tradingagents` 명령이 없으면 대개 패키지가 아직 가상환경에 설치되지 않은 상태입니다.
### Docker
```powershell
Copy-Item .env.example .env
notepad .env
docker compose run --rm tradingagents
```
Ollama 프로필:
```powershell
docker compose --profile ollama run --rm tradingagents-ollama
```
## API 및 인증
TradingAgents는 여러 LLM 제공자를 지원합니다.
### 일반 제공자용 환경 변수
```powershell
$env:OPENAI_API_KEY = "..."
$env:GOOGLE_API_KEY = "..."
$env:ANTHROPIC_API_KEY = "..."
$env:XAI_API_KEY = "..."
$env:OPENROUTER_API_KEY = "..."
$env:ALPHA_VANTAGE_API_KEY = "..."
```
### Codex 제공자
`codex` 제공자는 OpenAI API 키가 필요 없습니다. 대신 Codex CLI 로그인만 되어 있으면 됩니다.
```powershell
where.exe codex
codex --version
codex login
```
또는:
```powershell
codex login --device-auth
```
TradingAgents는 `codex app-server`와 stdio로 직접 통신하며, Codex가 관리하는 인증 정보를 사용합니다. 파일 기반 인증을 쓰는 경우 보통 `~/.codex/auth.json`이 사용될 수 있습니다.
권장 `~/.codex/config.toml`:
```toml
approval_policy = "never"
sandbox_mode = "read-only"
web_search = "disabled"
personality = "none"
cli_auth_credentials_store = "file"
```
중요한 점:
- TradingAgents는 자체 LangGraph `ToolNode`를 유지합니다.
- Codex dynamic tools는 사용하지 않습니다.
- 에이전트 간 컨텍스트 오염을 막기 위해 호출마다 새로운 ephemeral Codex thread를 사용합니다.
- 기본 Codex 작업 디렉터리는 `~/.codex/tradingagents-workspace`입니다.
VS Code 터미널에서 `codex`가 인식되지 않으면:
- `where.exe codex`로 경로를 확인합니다.
- VS Code 창을 다시 로드합니다.
- 필요하면 `where.exe codex`가 반환한 전체 경로로 `codex.exe`를 직접 실행합니다.
TradingAgents는 Windows에서 VS Code OpenAI 확장 설치 경로 같은 일반적인 위치의 `codex.exe`도 자동 탐지합니다. 자동 탐지를 덮어쓰고 싶다면:
```powershell
$env:CODEX_BINARY = "C:\full\path\to\codex.exe"
```
## CLI 실행
설치 후 인터랙티브 CLI 실행:
```powershell
Set-Location C:\Projects\TradingAgents
.\.venv-codex\Scripts\Activate.ps1
tradingagents
```
대안:
```powershell
Set-Location C:\Projects\TradingAgents
.\.venv-codex\Scripts\Activate.ps1
python -m cli.main
```
도움말 확인:
```powershell
Set-Location C:\Projects\TradingAgents
.\.venv-codex\Scripts\Activate.ps1
tradingagents --help
```
## Python 패키지로 사용
### 기본 예시
```python
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
ta = TradingAgentsGraph(debug=True, config=DEFAULT_CONFIG.copy())
_, decision = ta.propagate("NVDA", "2026-01-15")
print(decision)
```
### 설정 예시
```python
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
config = DEFAULT_CONFIG.copy()
config["llm_provider"] = "codex"
config["quick_think_llm"] = "gpt-5.4-mini"
config["deep_think_llm"] = "gpt-5.4-mini"
config["max_debate_rounds"] = 1
ta = TradingAgentsGraph(debug=True, config=config)
_, decision = ta.propagate("NVDA", "2026-01-15")
print(decision)
```
`llm_provider = "codex"`에서 추가로 조정할 수 있는 설정:
- `codex_binary`
- `codex_reasoning_effort`
- `codex_summary`
- `codex_personality`
- `codex_workspace_dir`
- `codex_request_timeout`
- `codex_max_retries`
- `codex_cleanup_threads`
## 이번 검증에서 확인한 항목
실제 Windows PowerShell 환경에서 다음 항목을 검증했습니다.
- `.venv-codex`에 패키지 설치
- `tradingagents --help` 실행
- 로그인된 Codex 계정으로 plain `llm.invoke(...)` 호출
- OpenAI 스타일 `list[dict]` 입력 경로
- `bind_tools()` 기반 tool-call 경로
- 최소 `TradingAgentsGraph(...).propagate(...)` smoke run으로 final decision 생성
최소 그래프 smoke run에서는 `FINAL_DECISION= HOLD`가 반환되는 것을 확인했습니다.
## 기여
버그 수정, 문서 개선, 기능 제안 등 모든 형태의 기여를 환영합니다.
## 인용
인용 정보는 [README.md](README.md)의 citation 섹션을 참고해 주세요.

View File

@ -27,6 +27,8 @@
# TradingAgents: Multi-Agents LLM Financial Trading Framework
Korean documentation: [README.ko.md](README.ko.md)
## News
- [2026-03] **TradingAgents v0.2.3** released with multi-language support, GPT-5.4 family models, unified model catalog, backtesting date fidelity, and proxy support.
- [2026-03] **TradingAgents v0.2.2** released with GPT-5.4/Gemini 3.1/Claude 4.6 model coverage, five-tier rating scale, OpenAI Responses API, Anthropic effort control, and cross-platform stability.
@ -118,6 +120,16 @@ Install the package and its dependencies:
pip install .
```
Windows PowerShell quickstart (validated in this repository):
```powershell
Set-Location C:\Projects\TradingAgents
py -3.13 -m venv .venv-codex
.\.venv-codex\Scripts\Activate.ps1
python -m pip install --upgrade pip
python -m pip install -e . --no-cache-dir
tradingagents --help
```
### Docker
Alternatively, run with Docker:
@ -146,6 +158,42 @@ export ALPHA_VANTAGE_API_KEY=... # Alpha Vantage
For local models, configure Ollama with `llm_provider: "ollama"` in your config.
For the local `codex` provider, no API key is required. Authenticate once with Codex instead:
```bash
codex login
# or
codex login --device-auth
```
TradingAgents talks directly to `codex app-server` over stdio and relies on Codex-managed credentials (for example `~/.codex/auth.json` when file-backed auth is enabled). If auth is missing, the provider fails with a message telling you to run `codex login`.
Recommended `~/.codex/config.toml` for TradingAgents:
```toml
approval_policy = "never"
sandbox_mode = "read-only"
web_search = "disabled"
personality = "none"
cli_auth_credentials_store = "file"
```
Important notes for `codex`:
- TradingAgents keeps its own LangGraph `ToolNode` execution. It does not use Codex dynamic tools.
- Each model invocation uses a fresh ephemeral Codex thread to avoid context bleed across agents.
- The default Codex workspace is a dedicated neutral directory under `~/.codex/tradingagents-workspace`, not your repo root.
Windows PowerShell notes for `codex`:
```powershell
where.exe codex
codex --version
codex login
```
If `codex` is not recognized in the VS Code terminal, reload the VS Code window after updating your terminal PATH or use the full `codex.exe` path returned by `where.exe codex`.
TradingAgents also tries to auto-discover `codex.exe` from common Windows locations such as the VS Code OpenAI extension install path. You can override detection explicitly with:
```powershell
$env:CODEX_BINARY = "C:\full\path\to\codex.exe"
```
Alternatively, copy `.env.example` to `.env` and fill in your keys:
```bash
cp .env.example .env
@ -160,6 +208,33 @@ python -m cli.main # alternative: run directly from source
```
You will see a screen where you can select your desired tickers, analysis date, LLM provider, research depth, and more.
Windows PowerShell run commands:
```powershell
Set-Location C:\Projects\TradingAgents
.\.venv-codex\Scripts\Activate.ps1
tradingagents
```
Alternative:
```powershell
Set-Location C:\Projects\TradingAgents
.\.venv-codex\Scripts\Activate.ps1
python -m cli.main
```
Validated Codex smoke checks:
```powershell
Set-Location C:\Projects\TradingAgents
.\.venv-codex\Scripts\Activate.ps1
tradingagents --help
```
The local Codex provider was also validated with:
- a plain `llm.invoke(...)` call
- an OpenAI-style `list[dict]` invoke path
- a `bind_tools()` tool-call path
- a minimal `TradingAgentsGraph(...).propagate(...)` smoke run that returned a final decision
<p align="center">
<img src="assets/cli/cli_init.png" width="100%" style="display: inline-block; margin: 0 2%;">
</p>
@ -178,7 +253,7 @@ An interface will appear showing results as they load, letting you track the age
### Implementation Details
We built TradingAgents with LangGraph to ensure flexibility and modularity. The framework supports multiple LLM providers: OpenAI, Google, Anthropic, xAI, OpenRouter, and Ollama.
We built TradingAgents with LangGraph to ensure flexibility and modularity. The framework supports multiple LLM providers: OpenAI, Codex, Google, Anthropic, xAI, OpenRouter, and Ollama.
### Python Usage
@ -202,7 +277,7 @@ from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
config = DEFAULT_CONFIG.copy()
config["llm_provider"] = "openai" # openai, google, anthropic, xai, openrouter, ollama
config["llm_provider"] = "openai" # openai, codex, google, anthropic, xai, openrouter, ollama
config["deep_think_llm"] = "gpt-5.4" # Model for complex reasoning
config["quick_think_llm"] = "gpt-5.4-mini" # Model for quick tasks
config["max_debate_rounds"] = 2
@ -214,6 +289,16 @@ print(decision)
See `tradingagents/default_config.py` for all configuration options.
When using `llm_provider = "codex"`, these extra config knobs are available:
- `codex_binary`
- `codex_reasoning_effort`
- `codex_summary`
- `codex_personality`
- `codex_workspace_dir`
- `codex_request_timeout`
- `codex_max_retries`
- `codex_cleanup_threads`
## Contributing
We welcome contributions from the community! Whether it's fixing a bug, improving documentation, or suggesting a new feature, your input helps make this project better. If you are interested in this line of research, please consider joining our open-source financial AI research community [Tauric Research](https://tauric.ai/).

View File

@ -568,6 +568,7 @@ def get_user_selections():
thinking_level = None
reasoning_effort = None
anthropic_effort = None
codex_reasoning_effort = None
provider_lower = selected_llm_provider.lower()
if provider_lower == "google":
@ -594,6 +595,14 @@ def get_user_selections():
)
)
anthropic_effort = ask_anthropic_effort()
elif provider_lower == "codex":
console.print(
create_question_box(
"Step 8: Reasoning Effort",
"Configure Codex reasoning effort level"
)
)
codex_reasoning_effort = ask_codex_reasoning_effort()
return {
"ticker": selected_ticker,
@ -607,6 +616,7 @@ def get_user_selections():
"google_thinking_level": thinking_level,
"openai_reasoning_effort": reasoning_effort,
"anthropic_effort": anthropic_effort,
"codex_reasoning_effort": codex_reasoning_effort,
"output_language": output_language,
}
@ -941,6 +951,7 @@ def run_analysis():
config["google_thinking_level"] = selections.get("google_thinking_level")
config["openai_reasoning_effort"] = selections.get("openai_reasoning_effort")
config["anthropic_effort"] = selections.get("anthropic_effort")
config["codex_reasoning_effort"] = selections.get("codex_reasoning_effort")
config["output_language"] = selections.get("output_language", "English")
# Create stats callback handler for tracking LLM/tool calls

View File

@ -237,6 +237,7 @@ def select_llm_provider() -> tuple[str, str | None]:
"""Select the LLM provider and its API endpoint."""
BASE_URLS = [
("OpenAI", "https://api.openai.com/v1"),
("Codex", None),
("Google", None), # google-genai SDK manages its own endpoint
("Anthropic", "https://api.anthropic.com/"),
("xAI", "https://api.x.ai/v1"),
@ -288,6 +289,11 @@ def ask_openai_reasoning_effort() -> str:
).ask()
def ask_codex_reasoning_effort() -> str:
"""Ask for Codex reasoning effort level."""
return ask_openai_reasoning_effort()
def ask_anthropic_effort() -> str | None:
"""Ask for Anthropic effort level.

View File

@ -0,0 +1,499 @@
import re
import unittest
from collections import deque
from pathlib import Path
from unittest.mock import patch
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate
from tradingagents.llm_clients.codex_app_server import (
CodexAppServerAuthError,
CodexAppServerBinaryError,
CodexInvocationResult,
CodexStructuredOutputError,
)
from tradingagents.llm_clients.codex_message_codec import normalize_input_messages
from tradingagents.llm_clients.codex_binary import resolve_codex_binary
from tradingagents.llm_clients.codex_preflight import run_codex_preflight
from tradingagents.llm_clients.codex_schema import (
build_plain_response_schema,
build_tool_response_schema,
normalize_tools_for_codex,
)
from tradingagents.llm_clients.factory import create_llm_client
def lookup_price(ticker: str) -> str:
"""Return the latest price snapshot for a ticker."""
def lookup_volume(ticker: str) -> str:
"""Return the latest volume snapshot for a ticker."""
class FakeCodexSession:
def __init__(
self,
*,
codex_binary=None,
request_timeout=0,
workspace_dir="",
cleanup_threads=True,
responses=None,
account_payload=None,
models_payload=None,
):
self.codex_binary = codex_binary
self.request_timeout = request_timeout
self.workspace_dir = workspace_dir
self.cleanup_threads = cleanup_threads
self.responses = deque(responses or [])
self.account_payload = account_payload or {
"account": {"type": "chatgpt"},
"requiresOpenaiAuth": False,
}
self.models_payload = models_payload or {
"data": [{"id": "gpt-5.4", "model": "gpt-5.4"}]
}
self.started = 0
self.closed = 0
self.invocations = []
def start(self):
self.started += 1
def close(self):
self.closed += 1
def account_read(self):
return self.account_payload
def model_list(self, include_hidden=True):
return self.models_payload
def invoke(
self,
*,
prompt,
model,
output_schema,
reasoning_effort,
summary,
personality,
):
self.invocations.append(
{
"prompt": prompt,
"model": model,
"output_schema": output_schema,
"reasoning_effort": reasoning_effort,
"summary": summary,
"personality": personality,
}
)
if not self.responses:
raise AssertionError("No fake Codex responses left.")
return CodexInvocationResult(final_text=self.responses.popleft(), notifications=[])
class CodexProviderTests(unittest.TestCase):
def test_resolve_codex_binary_uses_windows_vscode_fallback(self):
fake_home = Path("C:/Users/tester")
candidate = fake_home / ".vscode/extensions/openai.chatgpt-1.0.0/bin/windows-x86_64/codex.exe"
with (
patch("tradingagents.llm_clients.codex_binary.os.name", "nt"),
patch("tradingagents.llm_clients.codex_binary.Path.home", return_value=fake_home),
patch("tradingagents.llm_clients.codex_binary.shutil.which", return_value=None),
patch(
"tradingagents.llm_clients.codex_binary.Path.glob",
return_value=[candidate],
),
patch("pathlib.Path.is_file", return_value=True),
patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.stat") as mocked_stat,
):
mocked_stat.return_value.st_mtime = 1
resolved = resolve_codex_binary(None)
self.assertEqual(resolved, str(candidate))
def test_resolve_codex_binary_uses_env_override(self):
with (
patch("tradingagents.llm_clients.codex_binary.shutil.which", return_value=None),
patch.dict("os.environ", {"CODEX_BINARY": "C:/custom/codex.exe"}, clear=False),
patch("pathlib.Path.is_file", return_value=True),
):
resolved = resolve_codex_binary(None)
self.assertEqual(Path(resolved), Path("C:/custom/codex.exe"))
def test_message_normalization_supports_str_messages_and_openai_dicts(self):
normalized = normalize_input_messages(
[
{"role": "system", "content": "system"},
{"role": "user", "content": "user"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_123",
"type": "function",
"function": {
"name": "lookup_price",
"arguments": '{"ticker":"NVDA"}',
},
}
],
},
{"role": "tool", "tool_call_id": "call_123", "content": "42"},
]
)
self.assertIsInstance(normalized[0], SystemMessage)
self.assertIsInstance(normalized[1], HumanMessage)
self.assertIsInstance(normalized[2], AIMessage)
self.assertEqual(normalized[2].tool_calls[0]["name"], "lookup_price")
self.assertEqual(normalized[2].tool_calls[0]["args"], {"ticker": "NVDA"})
self.assertIsInstance(normalized[3], ToolMessage)
def test_output_schema_construction_builds_exact_tool_branches(self):
tool_schemas = normalize_tools_for_codex([lookup_price])
schema = build_tool_response_schema(tool_schemas)
required_schema = build_tool_response_schema(tool_schemas, allow_final=False)
plain_schema = build_plain_response_schema()
self.assertEqual(plain_schema["required"], ["answer"])
self.assertEqual(schema["properties"]["mode"]["enum"], ["final", "tool_calls"])
tool_branch = schema["properties"]["tool_calls"]["items"]
self.assertEqual(tool_branch["properties"]["name"]["const"], "lookup_price")
self.assertIn("arguments", tool_branch["required"])
self.assertEqual(required_schema["properties"]["mode"]["const"], "tool_calls")
generic_schema = build_tool_response_schema(
normalize_tools_for_codex([lookup_price, lookup_volume])
)
generic_items = generic_schema["properties"]["tool_calls"]["items"]
self.assertEqual(generic_items["properties"]["name"]["type"], "string")
self.assertIn("enum", generic_items["properties"]["name"])
self.assertEqual(generic_items["properties"]["arguments_json"]["type"], "string")
def test_plain_final_response_parsing(self):
session = FakeCodexSession(
responses=['{"answer":"Final decision"}'],
)
llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
session_factory=lambda **kwargs: session,
preflight_runner=lambda **kwargs: None,
).get_llm()
result = llm.invoke("Give me the final answer.")
self.assertEqual(result.content, "Final decision")
self.assertEqual(session.started, 1)
def test_invoke_accepts_openai_style_message_dicts(self):
session = FakeCodexSession(
responses=['{"answer":"From dict transcript"}'],
)
llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
session_factory=lambda **kwargs: session,
preflight_runner=lambda **kwargs: None,
).get_llm()
result = llm.invoke(
[
{"role": "system", "content": "system"},
{"role": "user", "content": "user"},
]
)
self.assertEqual(result.content, "From dict transcript")
self.assertIn("[System]\nsystem", session.invocations[0]["prompt"])
self.assertIn("[Human]\nuser", session.invocations[0]["prompt"])
def test_invoke_accepts_langchain_message_sequences(self):
session = FakeCodexSession(
responses=['{"answer":"From BaseMessage transcript"}'],
)
llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
session_factory=lambda **kwargs: session,
preflight_runner=lambda **kwargs: None,
).get_llm()
result = llm.invoke(
[
SystemMessage(content="system"),
HumanMessage(content="user"),
]
)
self.assertEqual(result.content, "From BaseMessage transcript")
self.assertIn("[System]\nsystem", session.invocations[0]["prompt"])
self.assertIn("[Human]\nuser", session.invocations[0]["prompt"])
def test_tool_call_response_parsing_populates_ai_message_tool_calls(self):
session = FakeCodexSession(
responses=[
'{"mode":"tool_calls","content":"Need data first","tool_calls":[{"name":"lookup_price","arguments":{"ticker":"NVDA"}}]}'
],
)
llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
session_factory=lambda **kwargs: session,
preflight_runner=lambda **kwargs: None,
).get_llm()
prompt = ChatPromptTemplate.from_messages(
[("system", "Use tools if needed."), ("human", "Analyze NVDA")]
)
result = (prompt | llm.bind_tools([lookup_price])).invoke({})
self.assertEqual(result.content, "Need data first")
self.assertEqual(result.tool_calls[0]["name"], "lookup_price")
self.assertEqual(result.tool_calls[0]["args"], {"ticker": "NVDA"})
self.assertRegex(result.tool_calls[0]["id"], r"^call_[0-9a-f]{32}$")
def test_multi_tool_response_parses_arguments_json(self):
session = FakeCodexSession(
responses=[
'{"mode":"tool_calls","content":"","tool_calls":[{"name":"lookup_price","arguments_json":"{\\"ticker\\":\\"NVDA\\"}"}]}'
],
)
llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
session_factory=lambda **kwargs: session,
preflight_runner=lambda **kwargs: None,
).get_llm()
result = llm.bind_tools([lookup_price, lookup_volume]).invoke("Analyze NVDA")
self.assertEqual(result.tool_calls[0]["name"], "lookup_price")
self.assertEqual(result.tool_calls[0]["args"], {"ticker": "NVDA"})
def test_bind_tools_honors_required_and_named_tool_choice(self):
required_session = FakeCodexSession(
responses=[
'{"mode":"tool_calls","content":"Calling tool","tool_calls":[{"name":"lookup_price","arguments":{"ticker":"NVDA"}}]}'
],
)
required_llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
session_factory=lambda **kwargs: required_session,
preflight_runner=lambda **kwargs: None,
).get_llm()
required_result = required_llm.bind_tools([lookup_price], tool_choice="required").invoke(
"Analyze NVDA"
)
self.assertTrue(required_result.tool_calls)
self.assertEqual(
required_session.invocations[0]["output_schema"]["properties"]["mode"]["const"],
"tool_calls",
)
self.assertIn(
"must respond with one or more tool calls",
required_session.invocations[0]["prompt"].lower(),
)
named_session = FakeCodexSession(
responses=[
'{"mode":"tool_calls","content":"Calling named tool","tool_calls":[{"name":"lookup_price","arguments":{"ticker":"MSFT"}}]}'
],
)
named_llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
session_factory=lambda **kwargs: named_session,
preflight_runner=lambda **kwargs: None,
).get_llm()
named_result = named_llm.bind_tools(
[lookup_price],
tool_choice={"type": "function", "function": {"name": "lookup_price"}},
).invoke("Analyze MSFT")
self.assertEqual(named_result.tool_calls[0]["name"], "lookup_price")
tool_item = named_session.invocations[0]["output_schema"]["properties"]["tool_calls"]["items"]
self.assertEqual(tool_item["properties"]["name"]["const"], "lookup_price")
self.assertIn(
"must call the tool named `lookup_price`",
named_session.invocations[0]["prompt"].lower(),
)
def test_malformed_json_retries_and_surfaces_error_when_exhausted(self):
session = FakeCodexSession(
responses=["not json", '{"answer":"Recovered"}'],
)
llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
codex_max_retries=1,
session_factory=lambda **kwargs: session,
preflight_runner=lambda **kwargs: None,
).get_llm()
result = llm.invoke("Recover after malformed JSON.")
self.assertEqual(result.content, "Recovered")
self.assertEqual(len(session.invocations), 2)
self.assertIn(
"previous response did not satisfy tradingagents validation",
session.invocations[1]["prompt"].lower(),
)
failing_session = FakeCodexSession(
responses=["still bad", "still bad again"],
)
failing_llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
codex_max_retries=1,
session_factory=lambda **kwargs: failing_session,
preflight_runner=lambda **kwargs: None,
).get_llm()
with self.assertRaises(CodexStructuredOutputError):
failing_llm.invoke("This should fail.")
def test_runtime_errors_do_not_retry_as_json_failures(self):
class FailingSession(FakeCodexSession):
def invoke(self, **kwargs):
raise RuntimeError("transport exploded")
session = FailingSession()
llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
codex_max_retries=2,
session_factory=lambda **kwargs: session,
preflight_runner=lambda **kwargs: None,
).get_llm()
with self.assertRaisesRegex(RuntimeError, "transport exploded"):
llm.invoke("fail fast")
def test_provider_codex_smoke_covers_bind_tools_and_direct_invoke_paths(self):
session = FakeCodexSession(
responses=[
'{"mode":"tool_calls","content":"Fetching market data","tool_calls":[{"name":"lookup_price","arguments":{"ticker":"NVDA"}}]}',
'{"answer":"Rating: Buy\\nExecutive Summary: Add gradually."}',
],
)
llm = create_llm_client(
"codex",
"gpt-5.4",
codex_binary="C:/fake/codex",
codex_workspace_dir="C:/tmp/codex-workspace",
session_factory=lambda **kwargs: session,
preflight_runner=lambda **kwargs: None,
).get_llm()
analyst_prompt = ChatPromptTemplate.from_messages(
[("system", "Use tools when you need extra data."), ("human", "Analyze NVDA.")]
)
market_result = (analyst_prompt | llm.bind_tools([lookup_price])).invoke({})
self.assertTrue(market_result.tool_calls)
self.assertEqual(market_result.tool_calls[0]["name"], "lookup_price")
decision = llm.invoke("Produce the final trade decision.")
self.assertIn("Rating: Buy", decision.content)
self.assertEqual(len(session.invocations), 2)
def test_preflight_detects_missing_auth_and_missing_binary(self):
valid_factory = lambda **kwargs: FakeCodexSession(
account_payload={
"account": {"type": "chatgpt", "email": "user@example.com"},
"requiresOpenaiAuth": True,
}
)
result = run_codex_preflight(
codex_binary="C:\\fake\\codex.exe",
model="gpt-5.4",
request_timeout=10.0,
workspace_dir="C:/tmp/codex-workspace",
cleanup_threads=True,
session_factory=valid_factory,
)
self.assertEqual(result.account["type"], "chatgpt")
authless_factory = lambda **kwargs: FakeCodexSession(
account_payload={"account": None, "requiresOpenaiAuth": True}
)
with self.assertRaises(CodexAppServerAuthError):
run_codex_preflight(
codex_binary="C:\\fake\\codex.exe",
model="gpt-5.4",
request_timeout=10.0,
workspace_dir="C:/tmp/codex-workspace",
cleanup_threads=True,
session_factory=authless_factory,
)
with patch(
"tradingagents.llm_clients.codex_preflight.resolve_codex_binary",
return_value=None,
):
with self.assertRaises(CodexAppServerBinaryError):
run_codex_preflight(
codex_binary="definitely-missing-codex-binary",
model="gpt-5.4",
request_timeout=10.0,
workspace_dir="C:/tmp/codex-workspace",
cleanup_threads=True,
)
def test_preflight_uses_resolved_binary_path(self):
captured = {}
def factory(**kwargs):
captured["codex_binary"] = kwargs["codex_binary"]
return FakeCodexSession(**kwargs)
with patch(
"tradingagents.llm_clients.codex_preflight.resolve_codex_binary",
return_value="C:/resolved/codex.exe",
):
run_codex_preflight(
codex_binary=None,
model="gpt-5.4",
request_timeout=10.0,
workspace_dir="C:/tmp/codex-workspace",
cleanup_threads=True,
session_factory=factory,
)
self.assertEqual(captured["codex_binary"], "C:/resolved/codex.exe")
if __name__ == "__main__":
unittest.main()

View File

@ -1,4 +1,5 @@
import os
from pathlib import Path
DEFAULT_CONFIG = {
"project_dir": os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
@ -16,6 +17,14 @@ DEFAULT_CONFIG = {
"google_thinking_level": None, # "high", "minimal", etc.
"openai_reasoning_effort": None, # "medium", "high", "low"
"anthropic_effort": None, # "high", "medium", "low"
"codex_binary": os.getenv("CODEX_BINARY"),
"codex_reasoning_effort": "medium",
"codex_summary": "none",
"codex_personality": "none",
"codex_workspace_dir": str(Path.home() / ".codex" / "tradingagents-workspace"),
"codex_request_timeout": 120.0,
"codex_max_retries": 2,
"codex_cleanup_threads": True,
# Output language for analyst reports and final decision
# Internal agent debate stays in English for reasoning quality
"output_language": "English",

View File

@ -152,6 +152,15 @@ class TradingAgentsGraph:
effort = self.config.get("anthropic_effort")
if effort:
kwargs["effort"] = effort
elif provider == "codex":
kwargs["codex_binary"] = self.config.get("codex_binary")
kwargs["codex_reasoning_effort"] = self.config.get("codex_reasoning_effort")
kwargs["codex_summary"] = self.config.get("codex_summary")
kwargs["codex_personality"] = self.config.get("codex_personality")
kwargs["codex_workspace_dir"] = self.config.get("codex_workspace_dir")
kwargs["codex_request_timeout"] = self.config.get("codex_request_timeout")
kwargs["codex_max_retries"] = self.config.get("codex_max_retries")
kwargs["codex_cleanup_threads"] = self.config.get("codex_cleanup_threads")
return kwargs

View File

@ -1,4 +1,10 @@
from .base_client import BaseLLMClient
from .factory import create_llm_client
def create_llm_client(*args, **kwargs):
from .factory import create_llm_client as _create_llm_client
return _create_llm_client(*args, **kwargs)
__all__ = ["BaseLLMClient", "create_llm_client"]

View File

@ -0,0 +1,337 @@
from __future__ import annotations
import json
import queue
import subprocess
import threading
import uuid
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from .codex_binary import codex_binary_error_message, resolve_codex_binary
class CodexAppServerError(RuntimeError):
"""Raised when the Codex app-server request cycle fails."""
class CodexAppServerAuthError(CodexAppServerError):
"""Raised when Codex login is missing or unusable."""
class CodexAppServerBinaryError(CodexAppServerError):
"""Raised when the Codex binary cannot be started."""
class CodexStructuredOutputError(CodexAppServerError):
"""Raised when Codex does not honor the requested structured output."""
@dataclass(slots=True)
class CodexInvocationResult:
final_text: str
notifications: list[dict[str, Any]]
class CodexAppServerSession:
"""Minimal JSON-RPC client for `codex app-server` over stdio JSONL."""
def __init__(
self,
*,
codex_binary: str | None,
request_timeout: float,
workspace_dir: str,
cleanup_threads: bool,
client_name: str = "tradingagents_codex",
client_title: str = "TradingAgents Codex Provider",
client_version: str = "0.2.3",
) -> None:
self.codex_binary = codex_binary
self.request_timeout = request_timeout
self.workspace_dir = str(Path(workspace_dir).expanduser())
self.cleanup_threads = cleanup_threads
self.client_name = client_name
self.client_title = client_title
self.client_version = client_version
self._proc: subprocess.Popen[str] | None = None
self._stdout_queue: queue.Queue[dict[str, Any] | None] = queue.Queue()
self._pending: deque[dict[str, Any]] = deque()
self._stderr_lines: deque[str] = deque(maxlen=200)
self._lock = threading.RLock()
self._request_lock = threading.RLock()
self._reader_thread: threading.Thread | None = None
self._stderr_thread: threading.Thread | None = None
def start(self) -> None:
with self._lock:
if self._proc is not None:
return
Path(self.workspace_dir).mkdir(parents=True, exist_ok=True)
binary = resolve_codex_binary(self.codex_binary)
if not binary:
raise CodexAppServerBinaryError(codex_binary_error_message(self.codex_binary))
self.codex_binary = binary
try:
self._proc = subprocess.Popen(
[binary, "app-server", "--listen", "stdio://"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
cwd=self.workspace_dir,
bufsize=1,
)
except OSError as exc:
raise CodexAppServerBinaryError(
f"Failed to start Codex app-server with binary '{binary}': {exc}"
) from exc
self._start_reader_threads()
self._initialize()
def close(self) -> None:
with self._lock:
proc = self._proc
self._proc = None
if proc is None:
return
try:
if proc.stdin:
proc.stdin.close()
except OSError:
pass
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
proc.kill()
def account_read(self) -> dict[str, Any]:
return self.request("account/read", {"refreshToken": False})
def model_list(self, *, include_hidden: bool = True) -> dict[str, Any]:
return self.request("model/list", {"includeHidden": include_hidden})
def invoke(
self,
*,
prompt: str,
model: str,
output_schema: dict[str, Any],
reasoning_effort: str | None,
summary: str | None,
personality: str | None,
) -> CodexInvocationResult:
with self._request_lock:
self.start()
thread_id = None
try:
thread = self.request(
"thread/start",
{
"approvalPolicy": "never",
"cwd": self.workspace_dir,
"ephemeral": True,
"model": model,
"personality": personality,
"sandbox": "read-only",
"serviceName": "tradingagents_codex",
},
)
thread_id = thread["thread"]["id"]
started = self.request(
"turn/start",
{
"threadId": thread_id,
"input": [{"type": "text", "text": prompt}],
"model": model,
"effort": reasoning_effort,
"summary": summary,
"outputSchema": output_schema,
},
)
turn_id = started["turn"]["id"]
final_text, notifications = self._collect_turn(turn_id)
return CodexInvocationResult(final_text=final_text, notifications=notifications)
finally:
if thread_id and self.cleanup_threads:
try:
self.request("thread/unsubscribe", {"threadId": thread_id})
except CodexAppServerError:
pass
def request(self, method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
request_id = str(uuid.uuid4())
self._write({"id": request_id, "method": method, "params": params or {}})
deferred: list[dict[str, Any]] = []
while True:
message = self._next_message(self.request_timeout)
if message.get("id") == request_id:
self._restore_deferred(deferred)
if "error" in message:
error = message["error"] or {}
code = error.get("code")
text = error.get("message", "unknown Codex app-server error")
raise CodexAppServerError(
f"{method} failed ({code}): {text}. stderr_tail={self._stderr_tail()}"
)
result = message.get("result")
if not isinstance(result, dict):
raise CodexAppServerError(f"{method} returned a non-object result: {result!r}")
return result
if "method" in message and "id" in message:
self._handle_server_request(message)
continue
deferred.append(message)
def _initialize(self) -> None:
response = self.request(
"initialize",
{
"clientInfo": {
"name": self.client_name,
"title": self.client_title,
"version": self.client_version,
}
},
)
if not response.get("userAgent"):
raise CodexAppServerError("Codex initialize response did not include userAgent.")
self._write({"method": "initialized", "params": {}})
def _collect_turn(self, turn_id: str) -> tuple[str, list[dict[str, Any]]]:
notifications: list[dict[str, Any]] = []
final_messages: list[str] = []
fallback_messages: list[str] = []
while True:
message = self._next_message(self.request_timeout)
if "method" in message and "id" in message:
self._handle_server_request(message)
continue
if "method" not in message:
self._pending.append(message)
continue
method = message["method"]
params = message.get("params", {})
notifications.append(message)
if (
method == "item/completed"
and isinstance(params, dict)
and params.get("turnId") == turn_id
):
item = params.get("item", {})
if isinstance(item, dict) and item.get("type") == "agentMessage":
text = str(item.get("text", ""))
if item.get("phase") == "final_answer":
final_messages.append(text)
else:
fallback_messages.append(text)
continue
if method == "turn/completed" and isinstance(params, dict):
turn = params.get("turn", {})
if isinstance(turn, dict) and turn.get("id") == turn_id:
status = turn.get("status")
if status == "failed":
error = turn.get("error", {})
message_text = error.get("message") if isinstance(error, dict) else None
raise CodexAppServerError(
message_text or f"Codex turn {turn_id} failed without an error message."
)
break
if final_messages:
return final_messages[-1], notifications
if fallback_messages:
return fallback_messages[-1], notifications
raise CodexStructuredOutputError("Codex turn completed without an assistant message.")
def _handle_server_request(self, message: dict[str, Any]) -> None:
try:
self._write({"id": message["id"], "result": {}})
except Exception:
pass
def _write(self, payload: dict[str, Any]) -> None:
if self._proc is None or self._proc.stdin is None:
raise CodexAppServerError("Codex app-server is not running.")
try:
self._proc.stdin.write(json.dumps(payload) + "\n")
self._proc.stdin.flush()
except OSError as exc:
raise CodexAppServerError(
f"Failed to write to Codex app-server: {exc}. stderr_tail={self._stderr_tail()}"
) from exc
def _next_message(self, timeout: float) -> dict[str, Any]:
if self._pending:
return self._pending.popleft()
try:
message = self._stdout_queue.get(timeout=timeout)
except queue.Empty as exc:
raise CodexAppServerError(
f"Timed out waiting for Codex app-server after {timeout}s. stderr_tail={self._stderr_tail()}"
) from exc
if message is None:
raise CodexAppServerError(
f"Codex app-server closed unexpectedly. stderr_tail={self._stderr_tail()}"
)
return message
def _start_reader_threads(self) -> None:
assert self._proc is not None
assert self._proc.stdout is not None
assert self._proc.stderr is not None
def _read_stdout() -> None:
stdout = self._proc.stdout
assert stdout is not None
for line in stdout:
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
except json.JSONDecodeError:
self._stderr_lines.append(f"invalid_json_stdout={line}")
continue
if isinstance(payload, dict):
self._stdout_queue.put(payload)
self._stdout_queue.put(None)
def _read_stderr() -> None:
stderr = self._proc.stderr
assert stderr is not None
for line in stderr:
self._stderr_lines.append(line.rstrip())
self._reader_thread = threading.Thread(target=_read_stdout, daemon=True)
self._stderr_thread = threading.Thread(target=_read_stderr, daemon=True)
self._reader_thread.start()
self._stderr_thread.start()
def _stderr_tail(self) -> str:
return "\n".join(list(self._stderr_lines)[-40:])
def _restore_deferred(self, deferred: list[dict[str, Any]]) -> None:
for message in reversed(deferred):
self._pending.appendleft(message)

View File

@ -0,0 +1,69 @@
from __future__ import annotations
import os
import shutil
from pathlib import Path
def resolve_codex_binary(codex_binary: str | None) -> str | None:
explicit = _normalize_explicit_binary(codex_binary)
if explicit:
return explicit
env_value = _normalize_explicit_binary(os.getenv("CODEX_BINARY"))
if env_value:
return env_value
path_binary = shutil.which("codex")
if path_binary:
return path_binary
for candidate in _windows_codex_candidates():
if candidate.is_file():
return str(candidate)
return None
def codex_binary_error_message(codex_binary: str | None) -> str:
requested = codex_binary or os.getenv("CODEX_BINARY") or "codex"
message = (
f"Could not find Codex binary '{requested}'. Install Codex, ensure it is on PATH, "
"set the `CODEX_BINARY` environment variable, or configure `codex_binary` with the full executable path."
)
discovered = [str(path) for path in _windows_codex_candidates() if path.is_file()]
if discovered:
message += f" Detected candidate: {discovered[0]}"
return message
def _normalize_explicit_binary(value: str | None) -> str | None:
if not value:
return None
expanded = str(Path(value).expanduser())
has_separator = any(sep and sep in expanded for sep in (os.path.sep, os.path.altsep))
if has_separator:
return expanded if Path(expanded).is_file() else None
found = shutil.which(expanded)
return found or None
def _windows_codex_candidates() -> list[Path]:
if os.name != "nt":
return []
home = Path.home()
candidates = sorted(
home.glob(r".vscode/extensions/openai.chatgpt-*/bin/windows-x86_64/codex.exe"),
key=lambda path: path.stat().st_mtime if path.exists() else 0,
reverse=True,
)
candidates.extend(
[
home / ".codex" / "bin" / "codex.exe",
home / "AppData" / "Local" / "Programs" / "Codex" / "codex.exe",
]
)
return candidates

View File

@ -0,0 +1,407 @@
from __future__ import annotations
import json
import threading
import uuid
from typing import Any, Callable, Sequence
from pydantic import ConfigDict, Field, PrivateAttr
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage
from langchain_core.outputs import ChatGeneration, ChatResult
from .codex_app_server import CodexAppServerSession, CodexStructuredOutputError
from .codex_message_codec import (
format_messages_for_codex,
normalize_input_messages,
strip_json_fence,
)
from .codex_preflight import run_codex_preflight
from .codex_schema import (
build_plain_response_schema,
build_tool_response_schema,
normalize_tools_for_codex,
)
class CodexChatModel(BaseChatModel):
"""LangChain chat model that talks to `codex app-server` over stdio."""
model: str
codex_binary: str | None = None
codex_reasoning_effort: str | None = None
codex_summary: str | None = None
codex_personality: str | None = None
codex_workspace_dir: str
codex_request_timeout: float = 120.0
codex_max_retries: int = 2
codex_cleanup_threads: bool = True
session_factory: Callable[..., CodexAppServerSession] | None = Field(
default=None, exclude=True, repr=False
)
preflight_runner: Callable[..., Any] | None = Field(
default=None, exclude=True, repr=False
)
model_config = ConfigDict(arbitrary_types_allowed=True)
_session: CodexAppServerSession | None = PrivateAttr(default=None)
_session_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_preflight_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_preflight_done: bool = PrivateAttr(default=False)
@property
def _llm_type(self) -> str:
return "codex"
@property
def _identifying_params(self) -> dict[str, Any]:
return {
"model": self.model,
"codex_binary": self.codex_binary,
"codex_reasoning_effort": self.codex_reasoning_effort,
"codex_summary": self.codex_summary,
"codex_personality": self.codex_personality,
}
def preflight(self) -> None:
with self._preflight_lock:
if self._preflight_done:
return
runner = self.preflight_runner or run_codex_preflight
runner(
codex_binary=self.codex_binary,
model=self.model,
request_timeout=self.codex_request_timeout,
workspace_dir=self.codex_workspace_dir,
cleanup_threads=self.codex_cleanup_threads,
session_factory=self.session_factory or CodexAppServerSession,
)
self._preflight_done = True
def bind_tools(
self,
tools: Sequence[dict[str, Any] | type | Callable | Any],
*,
tool_choice: str | bool | dict[str, Any] | None = None,
**kwargs: Any,
):
normalized_tools = normalize_tools_for_codex(tools)
return self.bind(tools=normalized_tools, tool_choice=tool_choice, **kwargs)
def close(self) -> None:
with self._session_lock:
if self._session is not None:
self._session.close()
self._session = None
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager=None,
**kwargs: Any,
) -> ChatResult:
self.preflight()
normalized_messages = normalize_input_messages(messages)
tools = kwargs.get("tools") or []
tool_choice = kwargs.get("tool_choice")
tool_binding = self._resolve_tool_binding(tools, tool_choice)
tools = tool_binding["tools"]
effective_tool_choice = tool_binding["tool_choice"]
output_schema = tool_binding["output_schema"]
tool_arguments_as_json_string = tool_binding["tool_arguments_as_json_string"]
raw_response: str | None = None
last_error: Exception | None = None
for attempt in range(self.codex_max_retries + 1):
retry_message = None
if attempt:
previous_error = str(last_error) if last_error is not None else "unknown schema mismatch"
retry_message = (
"The previous response did not satisfy TradingAgents validation: "
f"{previous_error}. Return only valid JSON that exactly matches the requested "
"schema and tool argument requirements."
)
prompt = format_messages_for_codex(
normalized_messages,
tool_names=[tool["function"]["name"] for tool in tools],
tool_schemas=tools,
tool_choice=effective_tool_choice,
tool_arguments_as_json_string=tool_arguments_as_json_string,
retry_message=retry_message,
)
result = self._session_or_create().invoke(
prompt=prompt,
model=self.model,
output_schema=output_schema,
reasoning_effort=self.codex_reasoning_effort,
summary=self.codex_summary,
personality=self.codex_personality,
)
raw_response = result.final_text
if run_manager is not None:
for notification in result.notifications:
if notification.get("method") != "item/agentMessage/delta":
continue
params = notification.get("params", {})
if isinstance(params, dict):
delta = params.get("delta")
if isinstance(delta, str) and delta:
run_manager.on_llm_new_token(delta)
try:
ai_message = (
self._parse_tool_response(
raw_response,
tools,
tool_arguments_as_json_string=tool_arguments_as_json_string,
)
if tools
else self._parse_plain_response(raw_response)
)
return ChatResult(generations=[ChatGeneration(message=ai_message)])
except (json.JSONDecodeError, CodexStructuredOutputError, ValueError) as exc:
last_error = exc
continue
raise CodexStructuredOutputError(
"Codex returned malformed structured output after "
f"{self.codex_max_retries + 1} attempt(s): {last_error}. "
f"Last response: {raw_response!r}"
)
def _parse_plain_response(self, raw_response: str) -> AIMessage:
payload = json.loads(strip_json_fence(raw_response))
if not isinstance(payload, dict) or not isinstance(payload.get("answer"), str):
raise CodexStructuredOutputError(
f"Expected plain response JSON with string `answer`, got: {payload!r}"
)
return AIMessage(content=payload["answer"])
def _parse_tool_response(
self,
raw_response: str,
tools: Sequence[dict[str, Any]],
*,
tool_arguments_as_json_string: bool,
) -> AIMessage:
payload = json.loads(strip_json_fence(raw_response))
if not isinstance(payload, dict):
raise CodexStructuredOutputError(f"Expected JSON object, got: {payload!r}")
mode = payload.get("mode")
content = payload.get("content", "")
if not isinstance(content, str):
raise CodexStructuredOutputError("Structured response `content` must be a string.")
if mode == "final":
tool_calls = payload.get("tool_calls", [])
if tool_calls not in ([], None):
raise CodexStructuredOutputError(
f"`mode=final` must not include tool calls, got: {tool_calls!r}"
)
return AIMessage(content=content)
if mode != "tool_calls":
raise CodexStructuredOutputError(f"Unknown structured response mode: {mode!r}")
raw_tool_calls = payload.get("tool_calls")
if not isinstance(raw_tool_calls, list) or not raw_tool_calls:
raise CodexStructuredOutputError("`mode=tool_calls` requires a non-empty tool_calls array.")
tool_calls: list[dict[str, Any]] = []
tool_parameters = {
tool.get("function", {}).get("name"): tool.get("function", {}).get("parameters", {})
for tool in tools
}
for item in raw_tool_calls:
if not isinstance(item, dict):
raise CodexStructuredOutputError(f"Tool call entries must be objects, got: {item!r}")
name = item.get("name")
arguments = self._extract_tool_arguments(
item,
tool_arguments_as_json_string=tool_arguments_as_json_string,
)
if not isinstance(name, str) or not isinstance(arguments, dict):
raise CodexStructuredOutputError(
f"Tool call entries must include string name and object arguments, got: {item!r}"
)
if name not in tool_parameters:
raise CodexStructuredOutputError(
f"Tool call name '{name}' is not in the bound tool set."
)
self._validate_tool_arguments(name, arguments, tool_parameters[name])
tool_calls.append(
{
"name": name,
"args": arguments,
"id": f"call_{uuid.uuid4().hex}",
}
)
return AIMessage(content=content, tool_calls=tool_calls)
def _extract_tool_arguments(
self,
item: dict[str, Any],
*,
tool_arguments_as_json_string: bool,
) -> dict[str, Any]:
if tool_arguments_as_json_string:
raw_arguments = item.get("arguments_json")
if not isinstance(raw_arguments, str):
raise CodexStructuredOutputError(
f"Tool call entries must include string arguments_json, got: {item!r}"
)
try:
parsed = json.loads(raw_arguments)
except json.JSONDecodeError as exc:
raise CodexStructuredOutputError(
f"Tool call arguments_json must contain valid JSON, got: {raw_arguments!r}"
) from exc
if not isinstance(parsed, dict):
raise CodexStructuredOutputError(
f"Tool call arguments_json must decode to an object, got: {parsed!r}"
)
return parsed
arguments = item.get("arguments")
if not isinstance(arguments, dict):
raise CodexStructuredOutputError(
f"Tool call entries must include object arguments, got: {item!r}"
)
return arguments
def _validate_tool_arguments(
self,
tool_name: str,
arguments: dict[str, Any],
schema: dict[str, Any] | None,
) -> None:
if not isinstance(schema, dict):
return
properties = schema.get("properties")
if properties is not None and not isinstance(properties, dict):
raise CodexStructuredOutputError(
f"Tool schema for '{tool_name}' has invalid properties metadata."
)
required = schema.get("required") or []
if isinstance(required, list):
missing = [name for name in required if name not in arguments]
if missing:
raise CodexStructuredOutputError(
f"Tool call '{tool_name}' is missing required arguments: {', '.join(missing)}"
)
if properties and schema.get("additionalProperties") is False:
unexpected = [name for name in arguments if name not in properties]
if unexpected:
raise CodexStructuredOutputError(
f"Tool call '{tool_name}' included unexpected arguments: {', '.join(unexpected)}"
)
def _session_or_create(self) -> CodexAppServerSession:
with self._session_lock:
if self._session is None:
factory = self.session_factory or CodexAppServerSession
self._session = factory(
codex_binary=self.codex_binary,
request_timeout=self.codex_request_timeout,
workspace_dir=self.codex_workspace_dir,
cleanup_threads=self.codex_cleanup_threads,
)
self._session.start()
return self._session
def _resolve_tool_binding(
self,
tools: Sequence[dict[str, Any]],
tool_choice: Any,
) -> dict[str, Any]:
tool_list = list(tools)
if not tool_list:
return {
"tools": [],
"tool_choice": None,
"output_schema": build_plain_response_schema(),
"tool_arguments_as_json_string": False,
}
if tool_choice in (None, "auto"):
return {
"tools": tool_list,
"tool_choice": None if tool_choice is None else "auto",
"output_schema": build_tool_response_schema(tool_list, allow_final=True),
"tool_arguments_as_json_string": len(tool_list) > 1,
}
if tool_choice in (False, "none"):
return {
"tools": [],
"tool_choice": "none",
"output_schema": build_plain_response_schema(),
"tool_arguments_as_json_string": False,
}
if tool_choice in (True, "any", "required"):
normalized_choice = "required" if tool_choice in (True, "required") else "any"
return {
"tools": tool_list,
"tool_choice": normalized_choice,
"output_schema": build_tool_response_schema(tool_list, allow_final=False),
"tool_arguments_as_json_string": len(tool_list) > 1,
}
selected_tool_name = self._extract_named_tool_choice(tool_choice)
if selected_tool_name is None:
raise CodexStructuredOutputError(
f"Unsupported Codex tool_choice value: {tool_choice!r}"
)
selected_tools = [
tool
for tool in tool_list
if tool.get("function", {}).get("name") == selected_tool_name
]
if not selected_tools:
available = ", ".join(
tool.get("function", {}).get("name", "<unknown>")
for tool in tool_list
)
raise CodexStructuredOutputError(
f"Requested tool_choice '{selected_tool_name}' is not in the bound tool set. "
f"Available tools: {available}"
)
return {
"tools": selected_tools,
"tool_choice": selected_tool_name,
"output_schema": build_tool_response_schema(selected_tools, allow_final=False),
"tool_arguments_as_json_string": False,
}
def _extract_named_tool_choice(self, tool_choice: Any) -> str | None:
if isinstance(tool_choice, str):
return tool_choice
if not isinstance(tool_choice, dict):
return None
function = tool_choice.get("function")
if isinstance(function, dict):
name = function.get("name")
if isinstance(name, str) and name:
return name
name = tool_choice.get("name")
if isinstance(name, str) and name:
return name
return None

View File

@ -0,0 +1,40 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Optional
from .base_client import BaseLLMClient
from .codex_chat_model import CodexChatModel
from .validators import validate_model
class CodexClient(BaseLLMClient):
"""Client wrapper for the local Codex app-server provider."""
def __init__(self, model: str, base_url: Optional[str] = None, **kwargs):
super().__init__(model, base_url, **kwargs)
def get_llm(self) -> Any:
self.warn_if_unknown_model()
llm = CodexChatModel(
model=self.model,
codex_binary=self.kwargs.get("codex_binary"),
codex_reasoning_effort=self.kwargs.get("codex_reasoning_effort"),
codex_summary=self.kwargs.get("codex_summary"),
codex_personality=self.kwargs.get("codex_personality"),
codex_workspace_dir=self.kwargs.get(
"codex_workspace_dir",
str(Path.home() / ".codex" / "tradingagents-workspace"),
),
codex_request_timeout=self.kwargs.get("codex_request_timeout", 120.0),
codex_max_retries=self.kwargs.get("codex_max_retries", 2),
codex_cleanup_threads=self.kwargs.get("codex_cleanup_threads", True),
session_factory=self.kwargs.get("session_factory"),
preflight_runner=self.kwargs.get("preflight_runner"),
callbacks=self.kwargs.get("callbacks"),
)
llm.preflight()
return llm
def validate_model(self) -> bool:
return validate_model("codex", self.model)

View File

@ -0,0 +1,236 @@
import json
from typing import Any, Iterable, Mapping, Sequence
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage, ToolMessage
class CodexMessageCodecError(ValueError):
"""Raised when TradingAgents inputs cannot be normalized for Codex."""
def normalize_input_messages(
value: str | Sequence[BaseMessage | Mapping[str, Any]],
) -> list[BaseMessage]:
"""Normalize TradingAgents model inputs into LangChain messages."""
if isinstance(value, str):
return [HumanMessage(content=value)]
normalized: list[BaseMessage] = []
for item in value:
if isinstance(item, BaseMessage):
normalized.append(item)
continue
if not isinstance(item, Mapping):
raise CodexMessageCodecError(
f"Unsupported message input type: {type(item).__name__}"
)
normalized.append(_message_from_dict(item))
return normalized
def format_messages_for_codex(
messages: Sequence[BaseMessage],
*,
tool_names: Iterable[str] = (),
tool_schemas: Sequence[Mapping[str, Any]] = (),
tool_choice: str | None = None,
tool_arguments_as_json_string: bool = False,
retry_message: str | None = None,
) -> str:
"""Render a chat transcript into a single text prompt for Codex."""
tool_list = list(tool_names)
lines = [
"You are answering on behalf of TradingAgents.",
"The conversation transcript is provided below.",
"Treat tool outputs as authoritative execution results from the host application.",
]
if tool_list:
lines.append(
"If external data is still needed, respond with tool calls using only these tools: "
+ ", ".join(tool_list)
+ "."
)
else:
lines.append("No host tools are available for this turn.")
if tool_choice == "none":
lines.append("Do not request tool calls for this turn.")
elif tool_choice in {"any", "required"}:
lines.append("You must respond with one or more tool calls for this turn.")
elif tool_choice and tool_choice != "auto":
lines.append(f"You must call the tool named `{tool_choice}` for this turn.")
elif tool_choice == "auto":
lines.append("Use tool calls only if they are necessary to answer correctly.")
if tool_arguments_as_json_string:
lines.append(
"When returning tool calls, encode each tool argument object as a JSON string in `arguments_json`."
)
schema_lines = _format_tool_schema_lines(tool_schemas)
if schema_lines:
lines.append("Tool argument requirements:")
lines.extend(schema_lines)
lines.append("Respond only with JSON that matches the requested output schema.")
if retry_message:
lines.append(retry_message)
transcript: list[str] = []
for message in messages:
transcript.append(_format_message(message))
return "\n\n".join(lines + ["Conversation transcript:", *transcript])
def strip_json_fence(text: str) -> str:
stripped = text.strip()
if stripped.startswith("```"):
parts = stripped.split("```")
if len(parts) >= 3:
candidate = parts[1]
if candidate.lstrip().startswith("json"):
candidate = candidate.lstrip()[4:]
return candidate.strip()
return stripped
def _message_from_dict(message: Mapping[str, Any]) -> BaseMessage:
role = str(message.get("role", "")).lower()
content = _content_to_text(message.get("content", ""))
if role == "system":
return SystemMessage(content=content)
if role == "user":
return HumanMessage(content=content)
if role == "tool":
tool_call_id = str(message.get("tool_call_id") or message.get("toolCallId") or "")
if not tool_call_id:
raise CodexMessageCodecError("Tool messages require tool_call_id.")
return ToolMessage(content=content, tool_call_id=tool_call_id)
if role == "assistant":
raw_tool_calls = message.get("tool_calls") or message.get("toolCalls") or []
tool_calls = _normalize_tool_calls(raw_tool_calls)
return AIMessage(content=content, tool_calls=tool_calls)
raise CodexMessageCodecError(f"Unsupported message role: {role!r}")
def _normalize_tool_calls(raw_tool_calls: Any) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
if not raw_tool_calls:
return normalized
if not isinstance(raw_tool_calls, Sequence):
raise CodexMessageCodecError("assistant.tool_calls must be a sequence")
for item in raw_tool_calls:
if not isinstance(item, Mapping):
raise CodexMessageCodecError("assistant.tool_calls items must be objects")
if "function" in item:
function = item.get("function")
if not isinstance(function, Mapping):
raise CodexMessageCodecError("assistant.tool_calls.function must be an object")
raw_args = function.get("arguments", {})
if isinstance(raw_args, str):
try:
args = json.loads(raw_args)
except json.JSONDecodeError as exc:
raise CodexMessageCodecError(
f"assistant tool arguments must be valid JSON: {raw_args!r}"
) from exc
else:
args = raw_args
if not isinstance(args, Mapping):
raise CodexMessageCodecError("assistant tool arguments must decode to an object")
normalized.append(
{
"name": str(function.get("name", "")),
"args": dict(args),
"id": str(item.get("id") or ""),
}
)
continue
args = item.get("args", {})
if not isinstance(args, Mapping):
raise CodexMessageCodecError("assistant tool args must be an object")
normalized.append(
{
"name": str(item.get("name", "")),
"args": dict(args),
"id": str(item.get("id") or ""),
}
)
return normalized
def _format_message(message: BaseMessage) -> str:
role = type(message).__name__.replace("Message", "") or "Message"
body = _content_to_text(message.content)
if isinstance(message, AIMessage) and message.tool_calls:
tool_call_json = json.dumps(
[
{
"id": tool_call.get("id"),
"name": tool_call.get("name"),
"args": tool_call.get("args", {}),
}
for tool_call in message.tool_calls
],
ensure_ascii=False,
indent=2,
sort_keys=True,
)
return f"[{role}]\n{body}\nTool calls:\n{tool_call_json}".strip()
if isinstance(message, ToolMessage):
return f"[Tool:{message.tool_call_id}]\n{body}".strip()
return f"[{role}]\n{body}".strip()
def _content_to_text(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, Mapping):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
else:
parts.append(json.dumps(dict(item), ensure_ascii=False))
else:
parts.append(str(item))
return "\n".join(part for part in parts if part)
if content is None:
return ""
return str(content)
def _format_tool_schema_lines(tool_schemas: Sequence[Mapping[str, Any]]) -> list[str]:
lines: list[str] = []
for tool_schema in tool_schemas:
function = tool_schema.get("function")
if not isinstance(function, Mapping):
continue
name = function.get("name")
parameters = function.get("parameters") or {}
if not isinstance(name, str) or not isinstance(parameters, Mapping):
continue
required = parameters.get("required") or []
properties = parameters.get("properties") or {}
summary = {
"required": required if isinstance(required, list) else [],
"properties": properties if isinstance(properties, Mapping) else {},
}
lines.append(
f"- {name}: {json.dumps(summary, ensure_ascii=False, sort_keys=True)}"
)
return lines

View File

@ -0,0 +1,72 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable
from .codex_app_server import (
CodexAppServerAuthError,
CodexAppServerBinaryError,
CodexAppServerSession,
)
from .codex_binary import codex_binary_error_message, resolve_codex_binary
@dataclass(slots=True)
class CodexPreflightResult:
account: dict
models: list[str]
def run_codex_preflight(
*,
codex_binary: str | None,
model: str,
request_timeout: float,
workspace_dir: str,
cleanup_threads: bool,
session_factory: Callable[..., CodexAppServerSession] = CodexAppServerSession,
) -> CodexPreflightResult:
binary = resolve_codex_binary(codex_binary)
if not binary:
raise CodexAppServerBinaryError(codex_binary_error_message(codex_binary))
session = session_factory(
codex_binary=binary,
request_timeout=request_timeout,
workspace_dir=workspace_dir,
cleanup_threads=cleanup_threads,
)
try:
session.start()
account_payload = session.account_read()
account = account_payload.get("account")
if not account:
raise CodexAppServerAuthError(
"Codex authentication is not available for TradingAgents. "
"Run `codex login` or `codex login --device-auth`, then retry."
)
models_payload = session.model_list(include_hidden=True)
models = _collect_model_names(models_payload)
if model not in models:
preview = ", ".join(models[:8]) if models else "no models reported"
raise CodexAppServerBinaryError(
f"Codex model '{model}' is not available from `model/list`. Available models: {preview}"
)
return CodexPreflightResult(account=account, models=models)
finally:
session.close()
def _collect_model_names(payload: dict) -> list[str]:
names: list[str] = []
for entry in payload.get("data", []) or []:
if not isinstance(entry, dict):
continue
for key in ("model", "id"):
value = entry.get(key)
if isinstance(value, str) and value not in names:
names.append(value)
return names

View File

@ -0,0 +1,118 @@
from __future__ import annotations
from typing import Any, Callable, Sequence
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_tool
def normalize_tools_for_codex(
tools: Sequence[dict[str, Any] | type | Callable | BaseTool],
) -> list[dict[str, Any]]:
"""Normalize LangChain tool definitions into OpenAI-style schemas."""
normalized: list[dict[str, Any]] = []
for tool in tools:
normalized.append(convert_to_openai_tool(tool, strict=True))
return normalized
def build_plain_response_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"answer": {"type": "string"},
},
"required": ["answer"],
"additionalProperties": False,
}
def build_tool_response_schema(
tool_schemas: Sequence[dict[str, Any]],
*,
allow_final: bool = True,
) -> dict[str, Any]:
tool_items_schema = _tool_items_schema(tool_schemas)
if not allow_final:
return {
"type": "object",
"properties": {
"mode": {"const": "tool_calls", "type": "string"},
"content": {"type": "string"},
"tool_calls": {
"type": "array",
"minItems": 1,
"items": tool_items_schema,
},
},
"required": ["mode", "content", "tool_calls"],
"additionalProperties": False,
}
return {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["final", "tool_calls"],
},
"content": {"type": "string"},
"tool_calls": {
"type": "array",
"items": tool_items_schema,
},
},
"required": ["mode", "content", "tool_calls"],
"additionalProperties": False,
}
def _tool_items_schema(tool_schemas: Sequence[dict[str, Any]]) -> dict[str, Any]:
if len(tool_schemas) == 1:
return _tool_call_variant(tool_schemas[0])
tool_names = [
tool_schema.get("function", {}).get("name")
for tool_schema in tool_schemas
if tool_schema.get("function", {}).get("name")
]
argument_properties: dict[str, Any] = {}
for tool_schema in tool_schemas:
parameters = tool_schema.get("function", {}).get("parameters") or {}
properties = parameters.get("properties") or {}
if not isinstance(properties, dict):
continue
for name, schema in properties.items():
if name not in argument_properties:
argument_properties[name] = schema
return {
"type": "object",
"properties": {
"name": {
"type": "string",
"enum": tool_names,
},
"arguments_json": {
"type": "string",
},
},
"required": ["name", "arguments_json"],
"additionalProperties": False,
}
def _tool_call_variant(tool_schema: dict[str, Any]) -> dict[str, Any]:
function = tool_schema.get("function", {})
parameters = function.get("parameters") or {"type": "object", "properties": {}}
return {
"type": "object",
"properties": {
"name": {
"const": function["name"],
"type": "string",
},
"arguments": parameters,
},
"required": ["name", "arguments"],
"additionalProperties": False,
}

View File

@ -1,9 +1,6 @@
from typing import Optional
from .base_client import BaseLLMClient
from .openai_client import OpenAIClient
from .anthropic_client import AnthropicClient
from .google_client import GoogleClient
def create_llm_client(
@ -15,7 +12,7 @@ def create_llm_client(
"""Create an LLM client for the specified provider.
Args:
provider: LLM provider (openai, anthropic, google, xai, ollama, openrouter)
provider: LLM provider (openai, anthropic, google, xai, ollama, openrouter, codex)
model: Model name/identifier
base_url: Optional base URL for API endpoint
**kwargs: Additional provider-specific arguments
@ -35,15 +32,28 @@ def create_llm_client(
provider_lower = provider.lower()
if provider_lower in ("openai", "ollama", "openrouter"):
from .openai_client import OpenAIClient
return OpenAIClient(model, base_url, provider=provider_lower, **kwargs)
if provider_lower == "xai":
from .openai_client import OpenAIClient
return OpenAIClient(model, base_url, provider="xai", **kwargs)
if provider_lower == "anthropic":
from .anthropic_client import AnthropicClient
return AnthropicClient(model, base_url, **kwargs)
if provider_lower == "google":
from .google_client import GoogleClient
return GoogleClient(model, base_url, **kwargs)
if provider_lower == "codex":
from .codex_client import CodexClient
return CodexClient(model, base_url, **kwargs)
raise ValueError(f"Unsupported LLM provider: {provider}")

View File

@ -23,6 +23,20 @@ MODEL_OPTIONS: ProviderModeOptions = {
("GPT-5.4 Pro - Most capable, expensive ($30/$180 per 1M tokens)", "gpt-5.4-pro"),
],
},
"codex": {
"quick": [
("GPT-5.4 Mini - Local Codex session, fast tool use", "gpt-5.4-mini"),
("GPT-5.4 Nano - Lowest-cost Codex model", "gpt-5.4-nano"),
("GPT-5.4 - Frontier Codex model", "gpt-5.4"),
("GPT-4.1 - Strong non-reasoning fallback", "gpt-4.1"),
],
"deep": [
("GPT-5.4 - Frontier Codex model", "gpt-5.4"),
("GPT-5.2 - Strong Codex reasoning", "gpt-5.2"),
("GPT-5.4 Mini - Faster Codex alternative", "gpt-5.4-mini"),
("GPT-5.4 Pro - Highest capability Codex model", "gpt-5.4-pro"),
],
},
"anthropic": {
"quick": [
("Claude Sonnet 4.6 - Best speed and intelligence balance", "claude-sonnet-4-6"),