feat: add OpenAI Responses API support for GPT-5 models

Add ChatOpenAIResponses wrapper, dynamic web_search tool selection, robust response parsing
This commit is contained in:
MUmarJ 2026-01-16 18:54:21 -05:00
parent d64c3d6758
commit 65572ca7a0
5 changed files with 403 additions and 9 deletions

View File

@ -1,7 +1,46 @@
import os
from openai import OpenAI
from .config import get_config
def _get_web_search_tool_type() -> str:
"""Return the appropriate web search tool type based on FETCH_LATEST setting.
- FETCH_LATEST=true: Use 'web_search' (GA version, supports GPT-5)
- FETCH_LATEST=false/unset: Use 'web_search_preview' (legacy, wider compatibility)
"""
fetch_latest = os.getenv("FETCH_LATEST", "false").lower() in ("true", "1", "yes")
return "web_search" if fetch_latest else "web_search_preview"
def _extract_text_from_response(response):
"""Safely extract text content from OpenAI Responses API output.
The response.output array typically contains:
- output[0]: ResponseFunctionWebSearch (the web search call)
- output[1]: ResponseOutputMessage (the text response)
This function handles edge cases where the structure may differ.
"""
if not response.output:
raise RuntimeError("OpenAI response has empty output")
# Look for a message with text content
for item in response.output:
if hasattr(item, 'content') and item.content:
for content_block in item.content:
if hasattr(content_block, 'text') and content_block.text:
return content_block.text
# If we get here, no text was found
output_types = [type(item).__name__ for item in response.output]
raise RuntimeError(
f"No text content found in OpenAI response. "
f"Output types: {output_types}"
)
def get_stock_news_openai(query, start_date, end_date):
config = get_config()
client = OpenAI(base_url=config["backend_url"])
@ -23,7 +62,7 @@ def get_stock_news_openai(query, start_date, end_date):
reasoning={},
tools=[
{
"type": "web_search_preview",
"type": _get_web_search_tool_type(),
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
@ -34,7 +73,7 @@ def get_stock_news_openai(query, start_date, end_date):
store=True,
)
return response.output[1].content[0].text
return _extract_text_from_response(response)
def get_global_news_openai(curr_date, look_back_days=7, limit=5):
@ -58,7 +97,7 @@ def get_global_news_openai(curr_date, look_back_days=7, limit=5):
reasoning={},
tools=[
{
"type": "web_search_preview",
"type": _get_web_search_tool_type(),
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
@ -69,7 +108,7 @@ def get_global_news_openai(curr_date, look_back_days=7, limit=5):
store=True,
)
return response.output[1].content[0].text
return _extract_text_from_response(response)
def get_fundamentals_openai(ticker, curr_date):
@ -93,7 +132,7 @@ def get_fundamentals_openai(ticker, curr_date):
reasoning={},
tools=[
{
"type": "web_search_preview",
"type": _get_web_search_tool_type(),
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
@ -104,4 +143,4 @@ def get_fundamentals_openai(ticker, curr_date):
store=True,
)
return response.output[1].content[0].text
return _extract_text_from_response(response)

View File

@ -12,6 +12,8 @@ from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.prebuilt import ToolNode
from tradingagents.llm import requires_responses_api, ChatOpenAIResponses
from tradingagents.agents import *
from tradingagents.default_config import DEFAULT_CONFIG
from tradingagents.agents.utils.memory import FinancialSituationMemory
@ -72,9 +74,20 @@ class TradingAgentsGraph:
)
# Initialize LLMs
if self.config["llm_provider"].lower() == "openai" or self.config["llm_provider"] == "ollama" or self.config["llm_provider"] == "openrouter":
self.deep_thinking_llm = ChatOpenAI(model=self.config["deep_think_llm"], base_url=self.config["backend_url"])
self.quick_thinking_llm = ChatOpenAI(model=self.config["quick_think_llm"], base_url=self.config["backend_url"])
if self.config["llm_provider"].lower() in ["openai", "ollama", "openrouter", "lm studio"]:
# Select LLM class based on model - newer models require Responses API
deep_model = self.config["deep_think_llm"]
quick_model = self.config["quick_think_llm"]
if requires_responses_api(deep_model):
self.deep_thinking_llm = ChatOpenAIResponses(model=deep_model, base_url=self.config["backend_url"])
else:
self.deep_thinking_llm = ChatOpenAI(model=deep_model, base_url=self.config["backend_url"])
if requires_responses_api(quick_model):
self.quick_thinking_llm = ChatOpenAIResponses(model=quick_model, base_url=self.config["backend_url"])
else:
self.quick_thinking_llm = ChatOpenAI(model=quick_model, base_url=self.config["backend_url"])
elif self.config["llm_provider"].lower() == "anthropic":
self.deep_thinking_llm = ChatAnthropic(model=self.config["deep_think_llm"], base_url=self.config["backend_url"])
self.quick_thinking_llm = ChatAnthropic(model=self.config["quick_think_llm"], base_url=self.config["backend_url"])

View File

@ -0,0 +1,9 @@
"""LLM wrapper utilities for TradingAgents.
This module provides custom LLM wrappers for different API endpoints.
"""
from tradingagents.llm.model_utils import requires_responses_api
from tradingagents.llm.openai_responses import ChatOpenAIResponses
__all__ = ["requires_responses_api", "ChatOpenAIResponses"]

View File

@ -0,0 +1,24 @@
"""Utility functions for model detection and selection."""
# Model prefixes that require the OpenAI Responses API (/v1/responses)
# instead of the Chat Completions API (/v1/chat/completions)
RESPONSES_API_PREFIXES = [
"gpt-5", # All GPT-5 variants (gpt-5, gpt-5.1, gpt-5.1-codex-mini, etc.)
"codex", # Codex models that use Responses API
]
def requires_responses_api(model_name: str) -> bool:
"""Check if a model requires the Responses API instead of Chat Completions.
Some newer OpenAI models only support the /v1/responses endpoint and will
return a 404 error if called via /v1/chat/completions.
Args:
model_name: The model identifier (e.g., "gpt-5.1-codex-mini", "gpt-4o")
Returns:
True if the model requires the Responses API, False otherwise.
"""
model_lower = model_name.lower()
return any(prefix in model_lower for prefix in RESPONSES_API_PREFIXES)

View File

@ -0,0 +1,309 @@
"""LangChain-compatible wrapper for OpenAI's Responses API.
This module provides ChatOpenAIResponses, a drop-in replacement for ChatOpenAI
that uses the /v1/responses endpoint instead of /v1/chat/completions.
This is required for newer models like gpt-5.1-codex-mini that only support
the Responses API.
"""
import os
import uuid
from typing import Any, Dict, Iterator, List, Optional, Sequence, Union
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
SystemMessage,
ToolMessage,
)
from langchain_core.outputs import ChatGeneration, ChatResult
from langchain_core.tools import BaseTool
from openai import OpenAI
from pydantic import Field
class ChatOpenAIResponses(BaseChatModel):
"""LangChain-compatible chat model using OpenAI's Responses API.
This class provides the same interface as ChatOpenAI but uses the
/v1/responses endpoint, which is required for certain newer models.
Example:
>>> llm = ChatOpenAIResponses(model="gpt-5.1-codex-mini")
>>> llm_with_tools = llm.bind_tools([my_tool])
>>> result = llm_with_tools.invoke([HumanMessage(content="Hello")])
"""
model: str = Field(default="gpt-5.1-codex-mini")
base_url: Optional[str] = Field(default=None)
api_key: Optional[str] = Field(default=None)
temperature: float = Field(default=1.0)
max_output_tokens: int = Field(default=4096)
top_p: float = Field(default=1.0)
# Internal state for tool binding
_bound_tools: List[Dict[str, Any]] = []
_client: Optional[OpenAI] = None
class Config:
arbitrary_types_allowed = True
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._bound_tools = []
self._client = None
@property
def _llm_type(self) -> str:
return "openai-responses"
@property
def client(self) -> OpenAI:
"""Lazily initialize the OpenAI client."""
if self._client is None:
api_key = self.api_key or os.getenv("OPENAI_API_KEY")
if self.base_url:
self._client = OpenAI(api_key=api_key, base_url=self.base_url)
else:
self._client = OpenAI(api_key=api_key)
return self._client
def bind_tools(
self,
tools: Sequence[Union[Dict[str, Any], BaseTool]],
**kwargs: Any,
) -> "ChatOpenAIResponses":
"""Bind tools to this model instance.
Args:
tools: A sequence of tools to bind. Can be LangChain tools or dicts.
Returns:
A new ChatOpenAIResponses instance with the tools bound.
"""
new_instance = ChatOpenAIResponses(
model=self.model,
base_url=self.base_url,
api_key=self.api_key,
temperature=self.temperature,
max_output_tokens=self.max_output_tokens,
top_p=self.top_p,
)
new_instance._bound_tools = self._convert_tools(tools)
return new_instance
def _convert_tools(
self, tools: Sequence[Union[Dict[str, Any], BaseTool]]
) -> List[Dict[str, Any]]:
"""Convert LangChain tools to OpenAI Responses API function format.
The Responses API uses a flat structure for function tools:
{
"type": "function",
"name": "function_name",
"description": "...",
"parameters": {...}
}
This differs from Chat Completions which nests under "function" key.
"""
converted = []
for tool in tools:
if isinstance(tool, BaseTool):
# Get the JSON schema for parameters
if tool.args_schema:
params = tool.args_schema.model_json_schema()
# Remove extra fields that OpenAI doesn't expect
params.pop("title", None)
params.pop("description", None)
else:
params = {"type": "object", "properties": {}}
# Responses API uses flat structure - name at top level, not nested
tool_schema = {
"type": "function",
"name": tool.name,
"description": tool.description or "",
"parameters": params,
}
converted.append(tool_schema)
elif isinstance(tool, dict):
# Handle dict format - convert from Chat Completions format if needed
if "function" in tool:
# Chat Completions format - flatten it
func = tool["function"]
tool_schema = {
"type": "function",
"name": func.get("name", ""),
"description": func.get("description", ""),
"parameters": func.get("parameters", {"type": "object", "properties": {}}),
}
converted.append(tool_schema)
elif "name" in tool:
# Already in Responses API format
converted.append(tool)
else:
# Unknown format, try to use as-is
converted.append(tool)
return converted
def _convert_messages(
self, messages: List[BaseMessage]
) -> List[Dict[str, Any]]:
"""Convert LangChain messages to OpenAI Responses API format.
The Responses API uses a different message format than Chat Completions:
- System/user messages use 'input_text' content type
- Assistant messages use 'output_text' content type (no function_call in content)
- Tool calls from assistant are represented as separate 'function_call' items
- Tool results use 'function_call_output' content type
"""
import json as json_module
converted = []
for msg in messages:
if isinstance(msg, SystemMessage):
content = msg.content if isinstance(msg.content, str) else str(msg.content)
converted.append({
"role": "system",
"content": [{"type": "input_text", "text": content}],
})
elif isinstance(msg, HumanMessage):
content = msg.content if isinstance(msg.content, str) else str(msg.content)
converted.append({
"role": "user",
"content": [{"type": "input_text", "text": content}],
})
elif isinstance(msg, AIMessage):
# Handle AI messages (assistant responses)
# First add text content if present
if msg.content:
content = msg.content if isinstance(msg.content, str) else str(msg.content)
converted.append({
"role": "assistant",
"content": [{"type": "output_text", "text": content}],
})
# Tool calls need to be added as separate items in the Responses API
if hasattr(msg, 'tool_calls') and msg.tool_calls:
for tc in msg.tool_calls:
# Convert args to JSON string for the API
args = tc.get("args", {})
if isinstance(args, dict):
args_str = json_module.dumps(args)
else:
args_str = str(args)
# Add tool call as a separate item (not inside assistant content)
converted.append({
"type": "function_call",
"call_id": tc.get("id", str(uuid.uuid4())),
"name": tc["name"],
"arguments": args_str,
})
elif not msg.content:
# Empty assistant message - add placeholder
converted.append({
"role": "assistant",
"content": [{"type": "output_text", "text": ""}],
})
elif isinstance(msg, ToolMessage):
# Tool results need to be formatted as function call outputs
content = msg.content if isinstance(msg.content, str) else str(msg.content)
converted.append({
"type": "function_call_output",
"call_id": msg.tool_call_id,
"output": content,
})
return converted
def _parse_response(self, response: Any) -> AIMessage:
"""Parse OpenAI Responses API response into LangChain AIMessage."""
text_content = ""
tool_calls = []
if not response.output:
return AIMessage(content="")
for item in response.output:
# Handle text output
if hasattr(item, 'content') and item.content:
for block in item.content:
if hasattr(block, 'text') and block.text:
text_content += block.text
# Handle function/tool calls
if hasattr(item, 'type') and item.type == 'function_call':
import json
args = item.arguments
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {"raw": args}
tool_calls.append({
"id": getattr(item, 'id', None) or getattr(item, 'call_id', None) or str(uuid.uuid4()),
"name": item.name,
"args": args,
})
if tool_calls:
return AIMessage(content=text_content, tool_calls=tool_calls)
return AIMessage(content=text_content)
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""Generate a response using the OpenAI Responses API.
Args:
messages: List of LangChain messages to send.
stop: Optional stop sequences (not used by Responses API).
run_manager: Optional callback manager.
Returns:
ChatResult containing the model's response.
"""
# Convert messages to Responses API format
converted_messages = self._convert_messages(messages)
# Build request parameters
request_params = {
"model": self.model,
"input": converted_messages,
"temperature": self.temperature,
"max_output_tokens": self.max_output_tokens,
"top_p": self.top_p,
}
# Add tools if bound
if self._bound_tools:
request_params["tools"] = self._bound_tools
# Make the API call
response = self.client.responses.create(**request_params)
# Parse the response
ai_message = self._parse_response(response)
return ChatResult(
generations=[ChatGeneration(message=ai_message)],
)
@property
def _identifying_params(self) -> Dict[str, Any]:
"""Return identifying parameters for this LLM."""
return {
"model": self.model,
"base_url": self.base_url,
"temperature": self.temperature,
"max_output_tokens": self.max_output_tokens,
}