Merge pull request #235 from luohy15/data_vendor

Add Alpha Vantage API Integration and Refactor Data Provider Architecture
This commit is contained in:
Yijia Xiao 2025-10-05 16:01:30 -07:00 committed by GitHub
commit 341d49f560
32 changed files with 2001 additions and 1389 deletions

2
.env.example Normal file
View File

@ -0,0 +1,2 @@
ALPHA_VANTAGE_API_KEY=alpha_vantage_api_key_placeholder
OPENAI_API_KEY=openai_api_key_placeholder

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
.venv
results
env/ env/
__pycache__/ __pycache__/
.DS_Store .DS_Store

View File

@ -114,16 +114,21 @@ pip install -r requirements.txt
### Required APIs ### Required APIs
You will also need the FinnHub API for financial data. All of our code is implemented with the free tier. You will need the OpenAI API for all the agents, and [Alpha Vantage API](https://www.alphavantage.co/support/#api-key) for fundamental and news data (default configuration).
```bash
export FINNHUB_API_KEY=$YOUR_FINNHUB_API_KEY
```
You will need the OpenAI API for all the agents.
```bash ```bash
export OPENAI_API_KEY=$YOUR_OPENAI_API_KEY export OPENAI_API_KEY=$YOUR_OPENAI_API_KEY
export ALPHA_VANTAGE_API_KEY=$YOUR_ALPHA_VANTAGE_API_KEY
``` ```
Alternatively, you can create a `.env` file in the project root with your API keys (see `.env.example` for reference):
```bash
cp .env.example .env
# Edit .env with your actual API keys
```
**Note:** The default configuration uses [Alpha Vantage](https://www.alphavantage.co/) for fundamental and news data. You can get a free API key from their website, or upgrade to [Alpha Vantage Premium](https://www.alphavantage.co/premium/) for higher rate limits and more stable access. If you prefer to use OpenAI for these data sources instead, you can modify the data vendor settings in `tradingagents/default_config.py`.
### CLI Usage ### CLI Usage
You can also try out the CLI directly by running: You can also try out the CLI directly by running:
@ -178,7 +183,14 @@ config = DEFAULT_CONFIG.copy()
config["deep_think_llm"] = "gpt-4.1-nano" # Use a different model config["deep_think_llm"] = "gpt-4.1-nano" # Use a different model
config["quick_think_llm"] = "gpt-4.1-nano" # Use a different model config["quick_think_llm"] = "gpt-4.1-nano" # Use a different model
config["max_debate_rounds"] = 1 # Increase debate rounds config["max_debate_rounds"] = 1 # Increase debate rounds
config["online_tools"] = True # Use online tools or cached data
# Configure data vendors (default uses yfinance and Alpha Vantage)
config["data_vendors"] = {
"core_stock_apis": "yfinance", # Options: yfinance, alpha_vantage, local
"technical_indicators": "yfinance", # Options: yfinance, alpha_vantage, local
"fundamental_data": "alpha_vantage", # Options: openai, alpha_vantage, local
"news_data": "alpha_vantage", # Options: openai, alpha_vantage, google, local
}
# Initialize with custom config # Initialize with custom config
ta = TradingAgentsGraph(debug=True, config=config) ta = TradingAgentsGraph(debug=True, config=config)
@ -188,7 +200,7 @@ _, decision = ta.propagate("NVDA", "2024-05-10")
print(decision) print(decision)
``` ```
> For `online_tools`, we recommend enabling them for experimentation, as they provide access to real-time data. The agents' offline tools rely on cached data from our **Tauric TradingDB**, a curated dataset we use for backtesting. We're currently in the process of refining this dataset, and we plan to release it soon alongside our upcoming projects. Stay tuned! > The default configuration uses yfinance for stock price and technical data, and Alpha Vantage for fundamental and news data. For production use or if you encounter rate limits, consider upgrading to [Alpha Vantage Premium](https://www.alphavantage.co/premium/) for more stable and reliable data access. For offline experimentation, there's a local data vendor option that uses our **Tauric TradingDB**, a curated dataset for backtesting, though this is still in development. We're currently refining this dataset and plan to release it soon alongside our upcoming projects. Stay tuned!
You can view the full list of configurations in `tradingagents/default_config.py`. You can view the full list of configurations in `tradingagents/default_config.py`.

View File

@ -4,6 +4,10 @@ import typer
from pathlib import Path from pathlib import Path
from functools import wraps from functools import wraps
from rich.console import Console from rich.console import Console
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
from rich.panel import Panel from rich.panel import Panel
from rich.spinner import Spinner from rich.spinner import Spinner
from rich.live import Live from rich.live import Live

20
main.py
View File

@ -1,14 +1,24 @@
from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG from tradingagents.default_config import DEFAULT_CONFIG
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Create a custom config # Create a custom config
config = DEFAULT_CONFIG.copy() config = DEFAULT_CONFIG.copy()
config["llm_provider"] = "google" # Use a different model config["deep_think_llm"] = "gpt-4o-mini" # Use a different model
config["backend_url"] = "https://generativelanguage.googleapis.com/v1" # Use a different backend config["quick_think_llm"] = "gpt-4o-mini" # Use a different model
config["deep_think_llm"] = "gemini-2.0-flash" # Use a different model
config["quick_think_llm"] = "gemini-2.0-flash" # Use a different model
config["max_debate_rounds"] = 1 # Increase debate rounds config["max_debate_rounds"] = 1 # Increase debate rounds
config["online_tools"] = True # Increase debate rounds
# Configure data vendors (default uses yfinance and alpha_vantage)
config["data_vendors"] = {
"core_stock_apis": "yfinance", # Options: yfinance, alpha_vantage, local
"technical_indicators": "yfinance", # Options: yfinance, alpha_vantage, local
"fundamental_data": "alpha_vantage", # Options: openai, alpha_vantage, local
"news_data": "alpha_vantage", # Options: openai, alpha_vantage, google, local
}
# Initialize with custom config # Initialize with custom config
ta = TradingAgentsGraph(debug=True, config=config) ta = TradingAgentsGraph(debug=True, config=config)

View File

@ -1,4 +1,4 @@
from .utils.agent_utils import Toolkit, create_msg_delete from .utils.agent_utils import create_msg_delete
from .utils.agent_states import AgentState, InvestDebateState, RiskDebateState from .utils.agent_states import AgentState, InvestDebateState, RiskDebateState
from .utils.memory import FinancialSituationMemory from .utils.memory import FinancialSituationMemory
@ -21,7 +21,6 @@ from .trader.trader import create_trader
__all__ = [ __all__ = [
"FinancialSituationMemory", "FinancialSituationMemory",
"Toolkit",
"AgentState", "AgentState",
"create_msg_delete", "create_msg_delete",
"InvestDebateState", "InvestDebateState",

View File

@ -1,28 +1,27 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time import time
import json import json
from tradingagents.agents.utils.agent_utils import get_fundamentals, get_balance_sheet, get_cashflow, get_income_statement, get_insider_sentiment, get_insider_transactions
from tradingagents.dataflows.config import get_config
def create_fundamentals_analyst(llm, toolkit): def create_fundamentals_analyst(llm):
def fundamentals_analyst_node(state): def fundamentals_analyst_node(state):
current_date = state["trade_date"] current_date = state["trade_date"]
ticker = state["company_of_interest"] ticker = state["company_of_interest"]
company_name = state["company_of_interest"] company_name = state["company_of_interest"]
if toolkit.config["online_tools"]: tools = [
tools = [toolkit.get_fundamentals_openai] get_fundamentals,
else: get_balance_sheet,
tools = [ get_cashflow,
toolkit.get_finnhub_company_insider_sentiment, get_income_statement,
toolkit.get_finnhub_company_insider_transactions, ]
toolkit.get_simfin_balance_sheet,
toolkit.get_simfin_cashflow,
toolkit.get_simfin_income_stmt,
]
system_message = ( system_message = (
"You are a researcher tasked with analyzing fundamental information over the past week about a company. Please write a comprehensive report of the company's fundamental information such as financial documents, company profile, basic company financials, company financial history, insider sentiment and insider transactions to gain a full view of the company's fundamental information to inform traders. Make sure to include as much detail as possible. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions." "You are a researcher tasked with analyzing fundamental information over the past week about a company. Please write a comprehensive report of the company's fundamental information such as financial documents, company profile, basic company financials, and company financial history to gain a full view of the company's fundamental information to inform traders. Make sure to include as much detail as possible. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ " Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read.", + " Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."
+ " Use the available tools: `get_fundamentals` for comprehensive company analysis, `get_balance_sheet`, `get_cashflow`, and `get_income_statement` for specific financial statements.",
) )
prompt = ChatPromptTemplate.from_messages( prompt = ChatPromptTemplate.from_messages(

View File

@ -1,25 +1,21 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time import time
import json import json
from tradingagents.agents.utils.agent_utils import get_stock_data, get_indicators
from tradingagents.dataflows.config import get_config
def create_market_analyst(llm, toolkit): def create_market_analyst(llm):
def market_analyst_node(state): def market_analyst_node(state):
current_date = state["trade_date"] current_date = state["trade_date"]
ticker = state["company_of_interest"] ticker = state["company_of_interest"]
company_name = state["company_of_interest"] company_name = state["company_of_interest"]
if toolkit.config["online_tools"]: tools = [
tools = [ get_stock_data,
toolkit.get_YFin_data_online, get_indicators,
toolkit.get_stockstats_indicators_report_online, ]
]
else:
tools = [
toolkit.get_YFin_data,
toolkit.get_stockstats_indicators_report,
]
system_message = ( system_message = (
"""You are a trading assistant tasked with analyzing financial markets. Your role is to select the **most relevant indicators** for a given market condition or trading strategy from the following list. The goal is to choose up to **8 indicators** that provide complementary insights without redundancy. Categories and each category's indicators are: """You are a trading assistant tasked with analyzing financial markets. Your role is to select the **most relevant indicators** for a given market condition or trading strategy from the following list. The goal is to choose up to **8 indicators** that provide complementary insights without redundancy. Categories and each category's indicators are:
@ -46,7 +42,7 @@ Volatility Indicators:
Volume-Based Indicators: Volume-Based Indicators:
- vwma: VWMA: A moving average weighted by volume. Usage: Confirm trends by integrating price action with volume data. Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses. - vwma: VWMA: A moving average weighted by volume. Usage: Confirm trends by integrating price action with volume data. Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses.
- Select indicators that provide diverse and complementary information. Avoid redundancy (e.g., do not select both rsi and stochrsi). Also briefly explain why they are suitable for the given market context. When you tool call, please use the exact name of the indicators provided above as they are defined parameters, otherwise your call will fail. Please make sure to call get_YFin_data first to retrieve the CSV that is needed to generate indicators. Write a very detailed and nuanced report of the trends you observe. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions.""" - Select indicators that provide diverse and complementary information. Avoid redundancy (e.g., do not select both rsi and stochrsi). Also briefly explain why they are suitable for the given market context. When you tool call, please use the exact name of the indicators provided above as they are defined parameters, otherwise your call will fail. Please make sure to call get_stock_data first to retrieve the CSV that is needed to generate indicators. Then use get_indicators with the specific indicator names. Write a very detailed and nuanced report of the trends you observe. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."""
+ """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read.""" + """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."""
) )

View File

@ -1,25 +1,23 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time import time
import json import json
from tradingagents.agents.utils.agent_utils import get_news, get_global_news
from tradingagents.dataflows.config import get_config
def create_news_analyst(llm, toolkit): def create_news_analyst(llm):
def news_analyst_node(state): def news_analyst_node(state):
current_date = state["trade_date"] current_date = state["trade_date"]
ticker = state["company_of_interest"] ticker = state["company_of_interest"]
if toolkit.config["online_tools"]: tools = [
tools = [toolkit.get_global_news_openai, toolkit.get_google_news] get_news,
else: get_global_news,
tools = [ ]
toolkit.get_finnhub_news,
toolkit.get_reddit_news,
toolkit.get_google_news,
]
system_message = ( system_message = (
"You are a news researcher tasked with analyzing recent news and trends over the past week. Please write a comprehensive report of the current state of the world that is relevant for trading and macroeconomics. Look at news from EODHD, and finnhub to be comprehensive. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions." "You are a news researcher tasked with analyzing recent news and trends over the past week. Please write a comprehensive report of the current state of the world that is relevant for trading and macroeconomics. Use the available tools: get_news(query, start_date, end_date) for company-specific or targeted news searches, and get_global_news(curr_date, look_back_days, limit) for broader macroeconomic news. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Makrdown table at the end of the report to organize key points in the report, organized and easy to read.""" + """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."""
) )
prompt = ChatPromptTemplate.from_messages( prompt = ChatPromptTemplate.from_messages(

View File

@ -1,24 +1,23 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time import time
import json import json
from tradingagents.agents.utils.agent_utils import get_news
from tradingagents.dataflows.config import get_config
def create_social_media_analyst(llm, toolkit): def create_social_media_analyst(llm):
def social_media_analyst_node(state): def social_media_analyst_node(state):
current_date = state["trade_date"] current_date = state["trade_date"]
ticker = state["company_of_interest"] ticker = state["company_of_interest"]
company_name = state["company_of_interest"] company_name = state["company_of_interest"]
if toolkit.config["online_tools"]: tools = [
tools = [toolkit.get_stock_news_openai] get_news,
else: ]
tools = [
toolkit.get_reddit_stock_info,
]
system_message = ( system_message = (
"You are a social media and company specific news researcher/analyst tasked with analyzing social media posts, recent company news, and public sentiment for a specific company over the past week. You will be given a company's name your objective is to write a comprehensive long report detailing your analysis, insights, and implications for traders and investors on this company's current state after looking at social media and what people are saying about that company, analyzing sentiment data of what people feel each day about the company, and looking at recent company news. Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions." "You are a social media and company specific news researcher/analyst tasked with analyzing social media posts, recent company news, and public sentiment for a specific company over the past week. You will be given a company's name your objective is to write a comprehensive long report detailing your analysis, insights, and implications for traders and investors on this company's current state after looking at social media and what people are saying about that company, analyzing sentiment data of what people feel each day about the company, and looking at recent company news. Use the get_news(query, start_date, end_date) tool to search for company-specific news and social media discussions. Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Makrdown table at the end of the report to organize key points in the report, organized and easy to read.""", + """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read.""",
) )
prompt = ChatPromptTemplate.from_messages( prompt = ChatPromptTemplate.from_messages(

View File

@ -1,19 +1,24 @@
from langchain_core.messages import BaseMessage, HumanMessage, ToolMessage, AIMessage from langchain_core.messages import HumanMessage, RemoveMessage
from typing import List
from typing import Annotated
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import RemoveMessage
from langchain_core.tools import tool
from datetime import date, timedelta, datetime
import functools
import pandas as pd
import os
from dateutil.relativedelta import relativedelta
from langchain_openai import ChatOpenAI
import tradingagents.dataflows.interface as interface
from tradingagents.default_config import DEFAULT_CONFIG
from langchain_core.messages import HumanMessage
# Import tools from separate utility files
from tradingagents.agents.utils.core_stock_tools import (
get_stock_data
)
from tradingagents.agents.utils.technical_indicators_tools import (
get_indicators
)
from tradingagents.agents.utils.fundamental_data_tools import (
get_fundamentals,
get_balance_sheet,
get_cashflow,
get_income_statement
)
from tradingagents.agents.utils.news_data_tools import (
get_news,
get_insider_sentiment,
get_insider_transactions,
get_global_news
)
def create_msg_delete(): def create_msg_delete():
def delete_messages(state): def delete_messages(state):
@ -31,389 +36,4 @@ def create_msg_delete():
return delete_messages return delete_messages
class Toolkit:
_config = DEFAULT_CONFIG.copy()
@classmethod
def update_config(cls, config):
"""Update the class-level configuration."""
cls._config.update(config)
@property
def config(self):
"""Access the configuration."""
return self._config
def __init__(self, config=None):
if config:
self.update_config(config)
@staticmethod
@tool
def get_reddit_news(
curr_date: Annotated[str, "Date you want to get news for in yyyy-mm-dd format"],
) -> str:
"""
Retrieve global news from Reddit within a specified time frame.
Args:
curr_date (str): Date you want to get news for in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the latest global news from Reddit in the specified time frame.
"""
global_news_result = interface.get_reddit_global_news(curr_date, 7, 5)
return global_news_result
@staticmethod
@tool
def get_finnhub_news(
ticker: Annotated[
str,
"Search query of a company, e.g. 'AAPL, TSM, etc.",
],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news about a given stock from Finnhub within a date range
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing news about the company within the date range from start_date to end_date
"""
end_date_str = end_date
end_date = datetime.strptime(end_date, "%Y-%m-%d")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
look_back_days = (end_date - start_date).days
finnhub_news_result = interface.get_finnhub_news(
ticker, end_date_str, look_back_days
)
return finnhub_news_result
@staticmethod
@tool
def get_reddit_stock_info(
ticker: Annotated[
str,
"Ticker of a company. e.g. AAPL, TSM",
],
curr_date: Annotated[str, "Current date you want to get news for"],
) -> str:
"""
Retrieve the latest news about a given stock from Reddit, given the current date.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): current date in yyyy-mm-dd format to get news for
Returns:
str: A formatted dataframe containing the latest news about the company on the given date
"""
stock_news_results = interface.get_reddit_company_news(ticker, curr_date, 7, 5)
return stock_news_results
@staticmethod
@tool
def get_YFin_data(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve the stock price data for a given ticker symbol from Yahoo Finance.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the stock price data for the specified ticker symbol in the specified date range.
"""
result_data = interface.get_YFin_data(symbol, start_date, end_date)
return result_data
@staticmethod
@tool
def get_YFin_data_online(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve the stock price data for a given ticker symbol from Yahoo Finance.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the stock price data for the specified ticker symbol in the specified date range.
"""
result_data = interface.get_YFin_data_online(symbol, start_date, end_date)
return result_data
@staticmethod
@tool
def get_stockstats_indicators_report(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[
str, "technical indicator to get the analysis and report of"
],
curr_date: Annotated[
str, "The current trading date you are trading on, YYYY-mm-dd"
],
look_back_days: Annotated[int, "how many days to look back"] = 30,
) -> str:
"""
Retrieve stock stats indicators for a given ticker symbol and indicator.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
indicator (str): Technical indicator to get the analysis and report of
curr_date (str): The current trading date you are trading on, YYYY-mm-dd
look_back_days (int): How many days to look back, default is 30
Returns:
str: A formatted dataframe containing the stock stats indicators for the specified ticker symbol and indicator.
"""
result_stockstats = interface.get_stock_stats_indicators_window(
symbol, indicator, curr_date, look_back_days, False
)
return result_stockstats
@staticmethod
@tool
def get_stockstats_indicators_report_online(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[
str, "technical indicator to get the analysis and report of"
],
curr_date: Annotated[
str, "The current trading date you are trading on, YYYY-mm-dd"
],
look_back_days: Annotated[int, "how many days to look back"] = 30,
) -> str:
"""
Retrieve stock stats indicators for a given ticker symbol and indicator.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
indicator (str): Technical indicator to get the analysis and report of
curr_date (str): The current trading date you are trading on, YYYY-mm-dd
look_back_days (int): How many days to look back, default is 30
Returns:
str: A formatted dataframe containing the stock stats indicators for the specified ticker symbol and indicator.
"""
result_stockstats = interface.get_stock_stats_indicators_window(
symbol, indicator, curr_date, look_back_days, True
)
return result_stockstats
@staticmethod
@tool
def get_finnhub_company_insider_sentiment(
ticker: Annotated[str, "ticker symbol for the company"],
curr_date: Annotated[
str,
"current date of you are trading at, yyyy-mm-dd",
],
):
"""
Retrieve insider sentiment information about a company (retrieved from public SEC information) for the past 30 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the sentiment in the past 30 days starting at curr_date
"""
data_sentiment = interface.get_finnhub_company_insider_sentiment(
ticker, curr_date, 30
)
return data_sentiment
@staticmethod
@tool
def get_finnhub_company_insider_transactions(
ticker: Annotated[str, "ticker symbol"],
curr_date: Annotated[
str,
"current date you are trading at, yyyy-mm-dd",
],
):
"""
Retrieve insider transaction information about a company (retrieved from public SEC information) for the past 30 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's insider transactions/trading information in the past 30 days
"""
data_trans = interface.get_finnhub_company_insider_transactions(
ticker, curr_date, 30
)
return data_trans
@staticmethod
@tool
def get_simfin_balance_sheet(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent balance sheet of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent balance sheet
"""
data_balance_sheet = interface.get_simfin_balance_sheet(ticker, freq, curr_date)
return data_balance_sheet
@staticmethod
@tool
def get_simfin_cashflow(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent cash flow statement of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent cash flow statement
"""
data_cashflow = interface.get_simfin_cashflow(ticker, freq, curr_date)
return data_cashflow
@staticmethod
@tool
def get_simfin_income_stmt(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual/quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve the most recent income statement of a company
Args:
ticker (str): ticker symbol of the company
freq (str): reporting frequency of the company's financial history: annual / quarterly
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's most recent income statement
"""
data_income_stmt = interface.get_simfin_income_statements(
ticker, freq, curr_date
)
return data_income_stmt
@staticmethod
@tool
def get_google_news(
query: Annotated[str, "Query to search with"],
curr_date: Annotated[str, "Curr date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news from Google News based on a query and date range.
Args:
query (str): Query to search with
curr_date (str): Current date in yyyy-mm-dd format
look_back_days (int): How many days to look back
Returns:
str: A formatted string containing the latest news from Google News based on the query and date range.
"""
google_news_results = interface.get_google_news(query, curr_date, 7)
return google_news_results
@staticmethod
@tool
def get_stock_news_openai(
ticker: Annotated[str, "the company's ticker"],
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest news about a given stock by using OpenAI's news API.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest news about the company on the given date.
"""
openai_news_results = interface.get_stock_news_openai(ticker, curr_date)
return openai_news_results
@staticmethod
@tool
def get_global_news_openai(
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest macroeconomics news on a given date using OpenAI's macroeconomics news API.
Args:
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest macroeconomic news on the given date.
"""
openai_news_results = interface.get_global_news_openai(curr_date)
return openai_news_results
@staticmethod
@tool
def get_fundamentals_openai(
ticker: Annotated[str, "the company's ticker"],
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
):
"""
Retrieve the latest fundamental information about a given stock on a given date by using OpenAI's news API.
Args:
ticker (str): Ticker of a company. e.g. AAPL, TSM
curr_date (str): Current date in yyyy-mm-dd format
Returns:
str: A formatted string containing the latest fundamental information about the company on the given date.
"""
openai_fundamentals_results = interface.get_fundamentals_openai(
ticker, curr_date
)
return openai_fundamentals_results

View File

@ -0,0 +1,22 @@
from langchain_core.tools import tool
from typing import Annotated
from tradingagents.dataflows.interface import route_to_vendor
@tool
def get_stock_data(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve stock price data (OHLCV) for a given ticker symbol.
Uses the configured core_stock_apis vendor.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted dataframe containing the stock price data for the specified ticker symbol in the specified date range.
"""
return route_to_vendor("get_stock_data", symbol, start_date, end_date)

View File

@ -0,0 +1,77 @@
from langchain_core.tools import tool
from typing import Annotated
from tradingagents.dataflows.interface import route_to_vendor
@tool
def get_fundamentals(
ticker: Annotated[str, "ticker symbol"],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
) -> str:
"""
Retrieve comprehensive fundamental data for a given ticker symbol.
Uses the configured fundamental_data vendor.
Args:
ticker (str): Ticker symbol of the company
curr_date (str): Current date you are trading at, yyyy-mm-dd
Returns:
str: A formatted report containing comprehensive fundamental data
"""
return route_to_vendor("get_fundamentals", ticker, curr_date)
@tool
def get_balance_sheet(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[str, "reporting frequency: annual/quarterly"] = "quarterly",
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"] = None,
) -> str:
"""
Retrieve balance sheet data for a given ticker symbol.
Uses the configured fundamental_data vendor.
Args:
ticker (str): Ticker symbol of the company
freq (str): Reporting frequency: annual/quarterly (default quarterly)
curr_date (str): Current date you are trading at, yyyy-mm-dd
Returns:
str: A formatted report containing balance sheet data
"""
return route_to_vendor("get_balance_sheet", ticker, freq, curr_date)
@tool
def get_cashflow(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[str, "reporting frequency: annual/quarterly"] = "quarterly",
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"] = None,
) -> str:
"""
Retrieve cash flow statement data for a given ticker symbol.
Uses the configured fundamental_data vendor.
Args:
ticker (str): Ticker symbol of the company
freq (str): Reporting frequency: annual/quarterly (default quarterly)
curr_date (str): Current date you are trading at, yyyy-mm-dd
Returns:
str: A formatted report containing cash flow statement data
"""
return route_to_vendor("get_cashflow", ticker, freq, curr_date)
@tool
def get_income_statement(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[str, "reporting frequency: annual/quarterly"] = "quarterly",
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"] = None,
) -> str:
"""
Retrieve income statement data for a given ticker symbol.
Uses the configured fundamental_data vendor.
Args:
ticker (str): Ticker symbol of the company
freq (str): Reporting frequency: annual/quarterly (default quarterly)
curr_date (str): Current date you are trading at, yyyy-mm-dd
Returns:
str: A formatted report containing income statement data
"""
return route_to_vendor("get_income_statement", ticker, freq, curr_date)

View File

@ -0,0 +1,71 @@
from langchain_core.tools import tool
from typing import Annotated
from tradingagents.dataflows.interface import route_to_vendor
@tool
def get_news(
ticker: Annotated[str, "Ticker symbol"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve news data for a given ticker symbol.
Uses the configured news_data vendor.
Args:
ticker (str): Ticker symbol
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted string containing news data
"""
return route_to_vendor("get_news", ticker, start_date, end_date)
@tool
def get_global_news(
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "Number of days to look back"] = 7,
limit: Annotated[int, "Maximum number of articles to return"] = 5,
) -> str:
"""
Retrieve global news data.
Uses the configured news_data vendor.
Args:
curr_date (str): Current date in yyyy-mm-dd format
look_back_days (int): Number of days to look back (default 7)
limit (int): Maximum number of articles to return (default 5)
Returns:
str: A formatted string containing global news data
"""
return route_to_vendor("get_global_news", curr_date, look_back_days, limit)
@tool
def get_insider_sentiment(
ticker: Annotated[str, "ticker symbol for the company"],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
) -> str:
"""
Retrieve insider sentiment information about a company.
Uses the configured news_data vendor.
Args:
ticker (str): Ticker symbol of the company
curr_date (str): Current date you are trading at, yyyy-mm-dd
Returns:
str: A report of insider sentiment data
"""
return route_to_vendor("get_insider_sentiment", ticker, curr_date)
@tool
def get_insider_transactions(
ticker: Annotated[str, "ticker symbol"],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
) -> str:
"""
Retrieve insider transaction information about a company.
Uses the configured news_data vendor.
Args:
ticker (str): Ticker symbol of the company
curr_date (str): Current date you are trading at, yyyy-mm-dd
Returns:
str: A report of insider transaction data
"""
return route_to_vendor("get_insider_transactions", ticker, curr_date)

View File

@ -0,0 +1,23 @@
from langchain_core.tools import tool
from typing import Annotated
from tradingagents.dataflows.interface import route_to_vendor
@tool
def get_indicators(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[str, "technical indicator to get the analysis and report of"],
curr_date: Annotated[str, "The current trading date you are trading on, YYYY-mm-dd"],
look_back_days: Annotated[int, "how many days to look back"] = 30,
) -> str:
"""
Retrieve technical indicators for a given ticker symbol.
Uses the configured technical_indicators vendor.
Args:
symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
indicator (str): Technical indicator to get the analysis and report of
curr_date (str): The current trading date you are trading on, YYYY-mm-dd
look_back_days (int): How many days to look back, default is 30
Returns:
str: A formatted dataframe containing the technical indicators for the specified ticker symbol and indicator.
"""
return route_to_vendor("get_indicators", symbol, indicator, curr_date, look_back_days)

View File

@ -1,46 +0,0 @@
from .finnhub_utils import get_data_in_range
from .googlenews_utils import getNewsData
from .yfin_utils import YFinanceUtils
from .reddit_utils import fetch_top_from_category
from .stockstats_utils import StockstatsUtils
from .yfin_utils import YFinanceUtils
from .interface import (
# News and sentiment functions
get_finnhub_news,
get_finnhub_company_insider_sentiment,
get_finnhub_company_insider_transactions,
get_google_news,
get_reddit_global_news,
get_reddit_company_news,
# Financial statements functions
get_simfin_balance_sheet,
get_simfin_cashflow,
get_simfin_income_statements,
# Technical analysis functions
get_stock_stats_indicators_window,
get_stockstats_indicator,
# Market data functions
get_YFin_data_window,
get_YFin_data,
)
__all__ = [
# News and sentiment functions
"get_finnhub_news",
"get_finnhub_company_insider_sentiment",
"get_finnhub_company_insider_transactions",
"get_google_news",
"get_reddit_global_news",
"get_reddit_company_news",
# Financial statements functions
"get_simfin_balance_sheet",
"get_simfin_cashflow",
"get_simfin_income_statements",
# Technical analysis functions
"get_stock_stats_indicators_window",
"get_stockstats_indicator",
# Market data functions
"get_YFin_data_window",
"get_YFin_data",
]

View File

@ -0,0 +1,5 @@
# Import functions from specialized modules
from .alpha_vantage_stock import get_stock
from .alpha_vantage_indicator import get_indicator
from .alpha_vantage_fundamentals import get_fundamentals, get_balance_sheet, get_cashflow, get_income_statement
from .alpha_vantage_news import get_news, get_insider_transactions

View File

@ -0,0 +1,122 @@
import os
import requests
import pandas as pd
import json
from datetime import datetime
from io import StringIO
API_BASE_URL = "https://www.alphavantage.co/query"
def get_api_key() -> str:
"""Retrieve the API key for Alpha Vantage from environment variables."""
api_key = os.getenv("ALPHA_VANTAGE_API_KEY")
if not api_key:
raise ValueError("ALPHA_VANTAGE_API_KEY environment variable is not set.")
return api_key
def format_datetime_for_api(date_input) -> str:
"""Convert various date formats to YYYYMMDDTHHMM format required by Alpha Vantage API."""
if isinstance(date_input, str):
# If already in correct format, return as-is
if len(date_input) == 13 and 'T' in date_input:
return date_input
# Try to parse common date formats
try:
dt = datetime.strptime(date_input, "%Y-%m-%d")
return dt.strftime("%Y%m%dT0000")
except ValueError:
try:
dt = datetime.strptime(date_input, "%Y-%m-%d %H:%M")
return dt.strftime("%Y%m%dT%H%M")
except ValueError:
raise ValueError(f"Unsupported date format: {date_input}")
elif isinstance(date_input, datetime):
return date_input.strftime("%Y%m%dT%H%M")
else:
raise ValueError(f"Date must be string or datetime object, got {type(date_input)}")
class AlphaVantageRateLimitError(Exception):
"""Exception raised when Alpha Vantage API rate limit is exceeded."""
pass
def _make_api_request(function_name: str, params: dict) -> dict | str:
"""Helper function to make API requests and handle responses.
Raises:
AlphaVantageRateLimitError: When API rate limit is exceeded
"""
# Create a copy of params to avoid modifying the original
api_params = params.copy()
api_params.update({
"function": function_name,
"apikey": get_api_key(),
"source": "trading_agents",
})
# Handle entitlement parameter if present in params or global variable
current_entitlement = globals().get('_current_entitlement')
entitlement = api_params.get("entitlement") or current_entitlement
if entitlement:
api_params["entitlement"] = entitlement
elif "entitlement" in api_params:
# Remove entitlement if it's None or empty
api_params.pop("entitlement", None)
response = requests.get(API_BASE_URL, params=api_params)
response.raise_for_status()
response_text = response.text
# Check if response is JSON (error responses are typically JSON)
try:
response_json = json.loads(response_text)
# Check for rate limit error
if "Information" in response_json:
info_message = response_json["Information"]
if "rate limit" in info_message.lower() or "api key" in info_message.lower():
raise AlphaVantageRateLimitError(f"Alpha Vantage rate limit exceeded: {info_message}")
except json.JSONDecodeError:
# Response is not JSON (likely CSV data), which is normal
pass
return response_text
def _filter_csv_by_date_range(csv_data: str, start_date: str, end_date: str) -> str:
"""
Filter CSV data to include only rows within the specified date range.
Args:
csv_data: CSV string from Alpha Vantage API
start_date: Start date in yyyy-mm-dd format
end_date: End date in yyyy-mm-dd format
Returns:
Filtered CSV string
"""
if not csv_data or csv_data.strip() == "":
return csv_data
try:
# Parse CSV data
df = pd.read_csv(StringIO(csv_data))
# Assume the first column is the date column (timestamp)
date_col = df.columns[0]
df[date_col] = pd.to_datetime(df[date_col])
# Filter by date range
start_dt = pd.to_datetime(start_date)
end_dt = pd.to_datetime(end_date)
filtered_df = df[(df[date_col] >= start_dt) & (df[date_col] <= end_dt)]
# Convert back to CSV string
return filtered_df.to_csv(index=False)
except Exception as e:
# If filtering fails, return original data with a warning
print(f"Warning: Failed to filter CSV data by date range: {e}")
return csv_data

View File

@ -0,0 +1,77 @@
from .alpha_vantage_common import _make_api_request
def get_fundamentals(ticker: str, curr_date: str = None) -> str:
"""
Retrieve comprehensive fundamental data for a given ticker symbol using Alpha Vantage.
Args:
ticker (str): Ticker symbol of the company
curr_date (str): Current date you are trading at, yyyy-mm-dd (not used for Alpha Vantage)
Returns:
str: Company overview data including financial ratios and key metrics
"""
params = {
"symbol": ticker,
}
return _make_api_request("OVERVIEW", params)
def get_balance_sheet(ticker: str, freq: str = "quarterly", curr_date: str = None) -> str:
"""
Retrieve balance sheet data for a given ticker symbol using Alpha Vantage.
Args:
ticker (str): Ticker symbol of the company
freq (str): Reporting frequency: annual/quarterly (default quarterly) - not used for Alpha Vantage
curr_date (str): Current date you are trading at, yyyy-mm-dd (not used for Alpha Vantage)
Returns:
str: Balance sheet data with normalized fields
"""
params = {
"symbol": ticker,
}
return _make_api_request("BALANCE_SHEET", params)
def get_cashflow(ticker: str, freq: str = "quarterly", curr_date: str = None) -> str:
"""
Retrieve cash flow statement data for a given ticker symbol using Alpha Vantage.
Args:
ticker (str): Ticker symbol of the company
freq (str): Reporting frequency: annual/quarterly (default quarterly) - not used for Alpha Vantage
curr_date (str): Current date you are trading at, yyyy-mm-dd (not used for Alpha Vantage)
Returns:
str: Cash flow statement data with normalized fields
"""
params = {
"symbol": ticker,
}
return _make_api_request("CASH_FLOW", params)
def get_income_statement(ticker: str, freq: str = "quarterly", curr_date: str = None) -> str:
"""
Retrieve income statement data for a given ticker symbol using Alpha Vantage.
Args:
ticker (str): Ticker symbol of the company
freq (str): Reporting frequency: annual/quarterly (default quarterly) - not used for Alpha Vantage
curr_date (str): Current date you are trading at, yyyy-mm-dd (not used for Alpha Vantage)
Returns:
str: Income statement data with normalized fields
"""
params = {
"symbol": ticker,
}
return _make_api_request("INCOME_STATEMENT", params)

View File

@ -0,0 +1,222 @@
from .alpha_vantage_common import _make_api_request
def get_indicator(
symbol: str,
indicator: str,
curr_date: str,
look_back_days: int,
interval: str = "daily",
time_period: int = 14,
series_type: str = "close"
) -> str:
"""
Returns Alpha Vantage technical indicator values over a time window.
Args:
symbol: ticker symbol of the company
indicator: technical indicator to get the analysis and report of
curr_date: The current trading date you are trading on, YYYY-mm-dd
look_back_days: how many days to look back
interval: Time interval (daily, weekly, monthly)
time_period: Number of data points for calculation
series_type: The desired price type (close, open, high, low)
Returns:
String containing indicator values and description
"""
from datetime import datetime
from dateutil.relativedelta import relativedelta
supported_indicators = {
"close_50_sma": ("50 SMA", "close"),
"close_200_sma": ("200 SMA", "close"),
"close_10_ema": ("10 EMA", "close"),
"macd": ("MACD", "close"),
"macds": ("MACD Signal", "close"),
"macdh": ("MACD Histogram", "close"),
"rsi": ("RSI", "close"),
"boll": ("Bollinger Middle", "close"),
"boll_ub": ("Bollinger Upper Band", "close"),
"boll_lb": ("Bollinger Lower Band", "close"),
"atr": ("ATR", None),
"vwma": ("VWMA", "close")
}
indicator_descriptions = {
"close_50_sma": "50 SMA: A medium-term trend indicator. Usage: Identify trend direction and serve as dynamic support/resistance. Tips: It lags price; combine with faster indicators for timely signals.",
"close_200_sma": "200 SMA: A long-term trend benchmark. Usage: Confirm overall market trend and identify golden/death cross setups. Tips: It reacts slowly; best for strategic trend confirmation rather than frequent trading entries.",
"close_10_ema": "10 EMA: A responsive short-term average. Usage: Capture quick shifts in momentum and potential entry points. Tips: Prone to noise in choppy markets; use alongside longer averages for filtering false signals.",
"macd": "MACD: Computes momentum via differences of EMAs. Usage: Look for crossovers and divergence as signals of trend changes. Tips: Confirm with other indicators in low-volatility or sideways markets.",
"macds": "MACD Signal: An EMA smoothing of the MACD line. Usage: Use crossovers with the MACD line to trigger trades. Tips: Should be part of a broader strategy to avoid false positives.",
"macdh": "MACD Histogram: Shows the gap between the MACD line and its signal. Usage: Visualize momentum strength and spot divergence early. Tips: Can be volatile; complement with additional filters in fast-moving markets.",
"rsi": "RSI: Measures momentum to flag overbought/oversold conditions. Usage: Apply 70/30 thresholds and watch for divergence to signal reversals. Tips: In strong trends, RSI may remain extreme; always cross-check with trend analysis.",
"boll": "Bollinger Middle: A 20 SMA serving as the basis for Bollinger Bands. Usage: Acts as a dynamic benchmark for price movement. Tips: Combine with the upper and lower bands to effectively spot breakouts or reversals.",
"boll_ub": "Bollinger Upper Band: Typically 2 standard deviations above the middle line. Usage: Signals potential overbought conditions and breakout zones. Tips: Confirm signals with other tools; prices may ride the band in strong trends.",
"boll_lb": "Bollinger Lower Band: Typically 2 standard deviations below the middle line. Usage: Indicates potential oversold conditions. Tips: Use additional analysis to avoid false reversal signals.",
"atr": "ATR: Averages true range to measure volatility. Usage: Set stop-loss levels and adjust position sizes based on current market volatility. Tips: It's a reactive measure, so use it as part of a broader risk management strategy.",
"vwma": "VWMA: A moving average weighted by volume. Usage: Confirm trends by integrating price action with volume data. Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses."
}
if indicator not in supported_indicators:
raise ValueError(
f"Indicator {indicator} is not supported. Please choose from: {list(supported_indicators.keys())}"
)
curr_date_dt = datetime.strptime(curr_date, "%Y-%m-%d")
before = curr_date_dt - relativedelta(days=look_back_days)
# Get the full data for the period instead of making individual calls
_, required_series_type = supported_indicators[indicator]
# Use the provided series_type or fall back to the required one
if required_series_type:
series_type = required_series_type
try:
# Get indicator data for the period
if indicator == "close_50_sma":
data = _make_api_request("SMA", {
"symbol": symbol,
"interval": interval,
"time_period": "50",
"series_type": series_type,
"datatype": "csv"
})
elif indicator == "close_200_sma":
data = _make_api_request("SMA", {
"symbol": symbol,
"interval": interval,
"time_period": "200",
"series_type": series_type,
"datatype": "csv"
})
elif indicator == "close_10_ema":
data = _make_api_request("EMA", {
"symbol": symbol,
"interval": interval,
"time_period": "10",
"series_type": series_type,
"datatype": "csv"
})
elif indicator == "macd":
data = _make_api_request("MACD", {
"symbol": symbol,
"interval": interval,
"series_type": series_type,
"datatype": "csv"
})
elif indicator == "macds":
data = _make_api_request("MACD", {
"symbol": symbol,
"interval": interval,
"series_type": series_type,
"datatype": "csv"
})
elif indicator == "macdh":
data = _make_api_request("MACD", {
"symbol": symbol,
"interval": interval,
"series_type": series_type,
"datatype": "csv"
})
elif indicator == "rsi":
data = _make_api_request("RSI", {
"symbol": symbol,
"interval": interval,
"time_period": str(time_period),
"series_type": series_type,
"datatype": "csv"
})
elif indicator in ["boll", "boll_ub", "boll_lb"]:
data = _make_api_request("BBANDS", {
"symbol": symbol,
"interval": interval,
"time_period": "20",
"series_type": series_type,
"datatype": "csv"
})
elif indicator == "atr":
data = _make_api_request("ATR", {
"symbol": symbol,
"interval": interval,
"time_period": str(time_period),
"datatype": "csv"
})
elif indicator == "vwma":
# Alpha Vantage doesn't have direct VWMA, so we'll return an informative message
# In a real implementation, this would need to be calculated from OHLCV data
return f"## VWMA (Volume Weighted Moving Average) for {symbol}:\n\nVWMA calculation requires OHLCV data and is not directly available from Alpha Vantage API.\nThis indicator would need to be calculated from the raw stock data using volume-weighted price averaging.\n\n{indicator_descriptions.get('vwma', 'No description available.')}"
else:
return f"Error: Indicator {indicator} not implemented yet."
# Parse CSV data and extract values for the date range
lines = data.strip().split('\n')
if len(lines) < 2:
return f"Error: No data returned for {indicator}"
# Parse header and data
header = [col.strip() for col in lines[0].split(',')]
try:
date_col_idx = header.index('time')
except ValueError:
return f"Error: 'time' column not found in data for {indicator}. Available columns: {header}"
# Map internal indicator names to expected CSV column names from Alpha Vantage
col_name_map = {
"macd": "MACD", "macds": "MACD_Signal", "macdh": "MACD_Hist",
"boll": "Real Middle Band", "boll_ub": "Real Upper Band", "boll_lb": "Real Lower Band",
"rsi": "RSI", "atr": "ATR", "close_10_ema": "EMA",
"close_50_sma": "SMA", "close_200_sma": "SMA"
}
target_col_name = col_name_map.get(indicator)
if not target_col_name:
# Default to the second column if no specific mapping exists
value_col_idx = 1
else:
try:
value_col_idx = header.index(target_col_name)
except ValueError:
return f"Error: Column '{target_col_name}' not found for indicator '{indicator}'. Available columns: {header}"
result_data = []
for line in lines[1:]:
if not line.strip():
continue
values = line.split(',')
if len(values) > value_col_idx:
try:
date_str = values[date_col_idx].strip()
# Parse the date
date_dt = datetime.strptime(date_str, "%Y-%m-%d")
# Check if date is in our range
if before <= date_dt <= curr_date_dt:
value = values[value_col_idx].strip()
result_data.append((date_dt, value))
except (ValueError, IndexError):
continue
# Sort by date and format output
result_data.sort(key=lambda x: x[0])
ind_string = ""
for date_dt, value in result_data:
ind_string += f"{date_dt.strftime('%Y-%m-%d')}: {value}\n"
if not ind_string:
ind_string = "No data available for the specified date range.\n"
result_str = (
f"## {indicator.upper()} values from {before.strftime('%Y-%m-%d')} to {curr_date}:\n\n"
+ ind_string
+ "\n\n"
+ indicator_descriptions.get(indicator, "No description available.")
)
return result_str
except Exception as e:
print(f"Error getting Alpha Vantage indicator data for {indicator}: {e}")
return f"Error retrieving {indicator} data: {str(e)}"

View File

@ -0,0 +1,43 @@
from .alpha_vantage_common import _make_api_request, format_datetime_for_api
def get_news(ticker, start_date, end_date) -> dict[str, str] | str:
"""Returns live and historical market news & sentiment data from premier news outlets worldwide.
Covers stocks, cryptocurrencies, forex, and topics like fiscal policy, mergers & acquisitions, IPOs.
Args:
ticker: Stock symbol for news articles.
start_date: Start date for news search.
end_date: End date for news search.
Returns:
Dictionary containing news sentiment data or JSON string.
"""
params = {
"tickers": ticker,
"time_from": format_datetime_for_api(start_date),
"time_to": format_datetime_for_api(end_date),
"sort": "LATEST",
"limit": "50",
}
return _make_api_request("NEWS_SENTIMENT", params)
def get_insider_transactions(symbol: str) -> dict[str, str] | str:
"""Returns latest and historical insider transactions by key stakeholders.
Covers transactions by founders, executives, board members, etc.
Args:
symbol: Ticker symbol. Example: "IBM".
Returns:
Dictionary containing insider transaction data or JSON string.
"""
params = {
"symbol": symbol,
}
return _make_api_request("INSIDER_TRANSACTIONS", params)

View File

@ -0,0 +1,38 @@
from datetime import datetime
from .alpha_vantage_common import _make_api_request, _filter_csv_by_date_range
def get_stock(
symbol: str,
start_date: str,
end_date: str
) -> str:
"""
Returns raw daily OHLCV values, adjusted close values, and historical split/dividend events
filtered to the specified date range.
Args:
symbol: The name of the equity. For example: symbol=IBM
start_date: Start date in yyyy-mm-dd format
end_date: End date in yyyy-mm-dd format
Returns:
CSV string containing the daily adjusted time series data filtered to the date range.
"""
# Parse dates to determine the range
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
today = datetime.now()
# Choose outputsize based on whether the requested range is within the latest 100 days
# Compact returns latest 100 data points, so check if start_date is recent enough
days_from_today_to_start = (today - start_dt).days
outputsize = "compact" if days_from_today_to_start < 100 else "full"
params = {
"symbol": symbol,
"outputsize": outputsize,
"datatype": "csv",
}
response = _make_api_request("TIME_SERIES_DAILY_ADJUSTED", params)
return _filter_csv_by_date_range(response, start_date, end_date)

View File

@ -1,36 +0,0 @@
import json
import os
def get_data_in_range(ticker, start_date, end_date, data_type, data_dir, period=None):
"""
Gets finnhub data saved and processed on disk.
Args:
start_date (str): Start date in YYYY-MM-DD format.
end_date (str): End date in YYYY-MM-DD format.
data_type (str): Type of data from finnhub to fetch. Can be insider_trans, SEC_filings, news_data, insider_senti, or fin_as_reported.
data_dir (str): Directory where the data is saved.
period (str): Default to none, if there is a period specified, should be annual or quarterly.
"""
if period:
data_path = os.path.join(
data_dir,
"finnhub_data",
data_type,
f"{ticker}_{period}_data_formatted.json",
)
else:
data_path = os.path.join(
data_dir, "finnhub_data", data_type, f"{ticker}_data_formatted.json"
)
data = open(data_path, "r")
data = json.load(data)
# filter keys (date, str in format YYYY-MM-DD) by the date range (str, str in format YYYY-MM-DD)
filtered_data = {}
for key, value in data.items():
if start_date <= key <= end_date and len(value) > 0:
filtered_data[key] = value
return filtered_data

View File

@ -0,0 +1,30 @@
from typing import Annotated
from datetime import datetime
from dateutil.relativedelta import relativedelta
from .googlenews_utils import getNewsData
def get_google_news(
query: Annotated[str, "Query to search with"],
curr_date: Annotated[str, "Curr date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "how many days to look back"],
) -> str:
query = query.replace(" ", "+")
start_date = datetime.strptime(curr_date, "%Y-%m-%d")
before = start_date - relativedelta(days=look_back_days)
before = before.strftime("%Y-%m-%d")
news_results = getNewsData(query, before, curr_date)
news_str = ""
for news in news_results:
news_str += (
f"### {news['title']} (source: {news['source']}) \n\n{news['snippet']}\n\n"
)
if len(news_results) == 0:
return ""
return f"## {query} Google News, from {before} to {curr_date}:\n\n{news_str}"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,475 @@
from typing import Annotated
import pandas as pd
import os
from .config import DATA_DIR
from datetime import datetime
from dateutil.relativedelta import relativedelta
import json
from .reddit_utils import fetch_top_from_category
from tqdm import tqdm
def get_YFin_data_window(
symbol: Annotated[str, "ticker symbol of the company"],
curr_date: Annotated[str, "Start date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "how many days to look back"],
) -> str:
# calculate past days
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
before = date_obj - relativedelta(days=look_back_days)
start_date = before.strftime("%Y-%m-%d")
# read in data
data = pd.read_csv(
os.path.join(
DATA_DIR,
f"market_data/price_data/{symbol}-YFin-data-2015-01-01-2025-03-25.csv",
)
)
# Extract just the date part for comparison
data["DateOnly"] = data["Date"].str[:10]
# Filter data between the start and end dates (inclusive)
filtered_data = data[
(data["DateOnly"] >= start_date) & (data["DateOnly"] <= curr_date)
]
# Drop the temporary column we created
filtered_data = filtered_data.drop("DateOnly", axis=1)
# Set pandas display options to show the full DataFrame
with pd.option_context(
"display.max_rows", None, "display.max_columns", None, "display.width", None
):
df_string = filtered_data.to_string()
return (
f"## Raw Market Data for {symbol} from {start_date} to {curr_date}:\n\n"
+ df_string
)
def get_YFin_data(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
# read in data
data = pd.read_csv(
os.path.join(
DATA_DIR,
f"market_data/price_data/{symbol}-YFin-data-2015-01-01-2025-03-25.csv",
)
)
if end_date > "2025-03-25":
raise Exception(
f"Get_YFin_Data: {end_date} is outside of the data range of 2015-01-01 to 2025-03-25"
)
# Extract just the date part for comparison
data["DateOnly"] = data["Date"].str[:10]
# Filter data between the start and end dates (inclusive)
filtered_data = data[
(data["DateOnly"] >= start_date) & (data["DateOnly"] <= end_date)
]
# Drop the temporary column we created
filtered_data = filtered_data.drop("DateOnly", axis=1)
# remove the index from the dataframe
filtered_data = filtered_data.reset_index(drop=True)
return filtered_data
def get_finnhub_news(
query: Annotated[str, "Search query or ticker symbol"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
):
"""
Retrieve news about a company within a time frame
Args
query (str): Search query or ticker symbol
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns
str: dataframe containing the news of the company in the time frame
"""
result = get_data_in_range(query, start_date, end_date, "news_data", DATA_DIR)
if len(result) == 0:
return ""
combined_result = ""
for day, data in result.items():
if len(data) == 0:
continue
for entry in data:
current_news = (
"### " + entry["headline"] + f" ({day})" + "\n" + entry["summary"]
)
combined_result += current_news + "\n\n"
return f"## {query} News, from {start_date} to {end_date}:\n" + str(combined_result)
def get_finnhub_company_insider_sentiment(
ticker: Annotated[str, "ticker symbol for the company"],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve insider sentiment about a company (retrieved from public SEC information) for the past 15 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading on, yyyy-mm-dd
Returns:
str: a report of the sentiment in the past 15 days starting at curr_date
"""
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
before = date_obj - relativedelta(days=15) # Default 15 days lookback
before = before.strftime("%Y-%m-%d")
data = get_data_in_range(ticker, before, curr_date, "insider_senti", DATA_DIR)
if len(data) == 0:
return ""
result_str = ""
seen_dicts = []
for date, senti_list in data.items():
for entry in senti_list:
if entry not in seen_dicts:
result_str += f"### {entry['year']}-{entry['month']}:\nChange: {entry['change']}\nMonthly Share Purchase Ratio: {entry['mspr']}\n\n"
seen_dicts.append(entry)
return (
f"## {ticker} Insider Sentiment Data for {before} to {curr_date}:\n"
+ result_str
+ "The change field refers to the net buying/selling from all insiders' transactions. The mspr field refers to monthly share purchase ratio."
)
def get_finnhub_company_insider_transactions(
ticker: Annotated[str, "ticker symbol"],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve insider transcaction information about a company (retrieved from public SEC information) for the past 15 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's insider transaction/trading informtaion in the past 15 days
"""
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
before = date_obj - relativedelta(days=15) # Default 15 days lookback
before = before.strftime("%Y-%m-%d")
data = get_data_in_range(ticker, before, curr_date, "insider_trans", DATA_DIR)
if len(data) == 0:
return ""
result_str = ""
seen_dicts = []
for date, senti_list in data.items():
for entry in senti_list:
if entry not in seen_dicts:
result_str += f"### Filing Date: {entry['filingDate']}, {entry['name']}:\nChange:{entry['change']}\nShares: {entry['share']}\nTransaction Price: {entry['transactionPrice']}\nTransaction Code: {entry['transactionCode']}\n\n"
seen_dicts.append(entry)
return (
f"## {ticker} insider transactions from {before} to {curr_date}:\n"
+ result_str
+ "The change field reflects the variation in share count—here a negative number indicates a reduction in holdings—while share specifies the total number of shares involved. The transactionPrice denotes the per-share price at which the trade was executed, and transactionDate marks when the transaction occurred. The name field identifies the insider making the trade, and transactionCode (e.g., S for sale) clarifies the nature of the transaction. FilingDate records when the transaction was officially reported, and the unique id links to the specific SEC filing, as indicated by the source. Additionally, the symbol ties the transaction to a particular company, isDerivative flags whether the trade involves derivative securities, and currency notes the currency context of the transaction."
)
def get_data_in_range(ticker, start_date, end_date, data_type, data_dir, period=None):
"""
Gets finnhub data saved and processed on disk.
Args:
start_date (str): Start date in YYYY-MM-DD format.
end_date (str): End date in YYYY-MM-DD format.
data_type (str): Type of data from finnhub to fetch. Can be insider_trans, SEC_filings, news_data, insider_senti, or fin_as_reported.
data_dir (str): Directory where the data is saved.
period (str): Default to none, if there is a period specified, should be annual or quarterly.
"""
if period:
data_path = os.path.join(
data_dir,
"finnhub_data",
data_type,
f"{ticker}_{period}_data_formatted.json",
)
else:
data_path = os.path.join(
data_dir, "finnhub_data", data_type, f"{ticker}_data_formatted.json"
)
data = open(data_path, "r")
data = json.load(data)
# filter keys (date, str in format YYYY-MM-DD) by the date range (str, str in format YYYY-MM-DD)
filtered_data = {}
for key, value in data.items():
if start_date <= key <= end_date and len(value) > 0:
filtered_data[key] = value
return filtered_data
def get_simfin_balance_sheet(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual / quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
data_path = os.path.join(
DATA_DIR,
"fundamental_data",
"simfin_data_all",
"balance_sheet",
"companies",
"us",
f"us-balance-{freq}.csv",
)
df = pd.read_csv(data_path, sep=";")
# Convert date strings to datetime objects and remove any time components
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
# Convert the current date to datetime and normalize
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
# Check if there are any available reports; if not, return a notification
if filtered_df.empty:
print("No balance sheet available before the given current date.")
return ""
# Get the most recent balance sheet by selecting the row with the latest Publish Date
latest_balance_sheet = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
# drop the SimFinID column
latest_balance_sheet = latest_balance_sheet.drop("SimFinId")
return (
f"## {freq} balance sheet for {ticker} released on {str(latest_balance_sheet['Publish Date'])[0:10]}: \n"
+ str(latest_balance_sheet)
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a breakdown of assets, liabilities, and equity. Assets are grouped as current (liquid items like cash and receivables) and noncurrent (long-term investments and property). Liabilities are split between short-term obligations and long-term debts, while equity reflects shareholder funds such as paid-in capital and retained earnings. Together, these components ensure that total assets equal the sum of liabilities and equity."
)
def get_simfin_cashflow(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual / quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
data_path = os.path.join(
DATA_DIR,
"fundamental_data",
"simfin_data_all",
"cash_flow",
"companies",
"us",
f"us-cashflow-{freq}.csv",
)
df = pd.read_csv(data_path, sep=";")
# Convert date strings to datetime objects and remove any time components
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
# Convert the current date to datetime and normalize
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
# Check if there are any available reports; if not, return a notification
if filtered_df.empty:
print("No cash flow statement available before the given current date.")
return ""
# Get the most recent cash flow statement by selecting the row with the latest Publish Date
latest_cash_flow = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
# drop the SimFinID column
latest_cash_flow = latest_cash_flow.drop("SimFinId")
return (
f"## {freq} cash flow statement for {ticker} released on {str(latest_cash_flow['Publish Date'])[0:10]}: \n"
+ str(latest_cash_flow)
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a breakdown of cash movements. Operating activities show cash generated from core business operations, including net income adjustments for non-cash items and working capital changes. Investing activities cover asset acquisitions/disposals and investments. Financing activities include debt transactions, equity issuances/repurchases, and dividend payments. The net change in cash represents the overall increase or decrease in the company's cash position during the reporting period."
)
def get_simfin_income_statements(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual / quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
data_path = os.path.join(
DATA_DIR,
"fundamental_data",
"simfin_data_all",
"income_statements",
"companies",
"us",
f"us-income-{freq}.csv",
)
df = pd.read_csv(data_path, sep=";")
# Convert date strings to datetime objects and remove any time components
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
# Convert the current date to datetime and normalize
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
# Check if there are any available reports; if not, return a notification
if filtered_df.empty:
print("No income statement available before the given current date.")
return ""
# Get the most recent income statement by selecting the row with the latest Publish Date
latest_income = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
# drop the SimFinID column
latest_income = latest_income.drop("SimFinId")
return (
f"## {freq} income statement for {ticker} released on {str(latest_income['Publish Date'])[0:10]}: \n"
+ str(latest_income)
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a comprehensive breakdown of the company's financial performance. Starting with Revenue, it shows Cost of Revenue and resulting Gross Profit. Operating Expenses are detailed, including SG&A, R&D, and Depreciation. The statement then shows Operating Income, followed by non-operating items and Interest Expense, leading to Pretax Income. After accounting for Income Tax and any Extraordinary items, it concludes with Net Income, representing the company's bottom-line profit or loss for the period."
)
def get_reddit_global_news(
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "Number of days to look back"] = 7,
limit: Annotated[int, "Maximum number of articles to return"] = 5,
) -> str:
"""
Retrieve the latest top reddit news
Args:
curr_date: Current date in yyyy-mm-dd format
look_back_days: Number of days to look back (default 7)
limit: Maximum number of articles to return (default 5)
Returns:
str: A formatted string containing the latest news articles posts on reddit
"""
curr_date_dt = datetime.strptime(curr_date, "%Y-%m-%d")
before = curr_date_dt - relativedelta(days=look_back_days)
before = before.strftime("%Y-%m-%d")
posts = []
# iterate from before to curr_date
curr_iter_date = datetime.strptime(before, "%Y-%m-%d")
total_iterations = (curr_date_dt - curr_iter_date).days + 1
pbar = tqdm(desc=f"Getting Global News on {curr_date}", total=total_iterations)
while curr_iter_date <= curr_date_dt:
curr_date_str = curr_iter_date.strftime("%Y-%m-%d")
fetch_result = fetch_top_from_category(
"global_news",
curr_date_str,
limit,
data_path=os.path.join(DATA_DIR, "reddit_data"),
)
posts.extend(fetch_result)
curr_iter_date += relativedelta(days=1)
pbar.update(1)
pbar.close()
if len(posts) == 0:
return ""
news_str = ""
for post in posts:
if post["content"] == "":
news_str += f"### {post['title']}\n\n"
else:
news_str += f"### {post['title']}\n\n{post['content']}\n\n"
return f"## Global News Reddit, from {before} to {curr_date}:\n{news_str}"
def get_reddit_company_news(
query: Annotated[str, "Search query or ticker symbol"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve the latest top reddit news
Args:
query: Search query or ticker symbol
start_date: Start date in yyyy-mm-dd format
end_date: End date in yyyy-mm-dd format
Returns:
str: A formatted string containing news articles posts on reddit
"""
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
posts = []
# iterate from start_date to end_date
curr_date = start_date_dt
total_iterations = (end_date_dt - curr_date).days + 1
pbar = tqdm(
desc=f"Getting Company News for {query} from {start_date} to {end_date}",
total=total_iterations,
)
while curr_date <= end_date_dt:
curr_date_str = curr_date.strftime("%Y-%m-%d")
fetch_result = fetch_top_from_category(
"company_news",
curr_date_str,
10, # max limit per day
query,
data_path=os.path.join(DATA_DIR, "reddit_data"),
)
posts.extend(fetch_result)
curr_date += relativedelta(days=1)
pbar.update(1)
pbar.close()
if len(posts) == 0:
return ""
news_str = ""
for post in posts:
if post["content"] == "":
news_str += f"### {post['title']}\n\n"
else:
news_str += f"### {post['title']}\n\n{post['content']}\n\n"
return f"##{query} News Reddit, from {start_date} to {end_date}:\n\n{news_str}"

View File

@ -0,0 +1,107 @@
from openai import OpenAI
from .config import get_config
def get_stock_news_openai(query, start_date, end_date):
config = get_config()
client = OpenAI(base_url=config["backend_url"])
response = client.responses.create(
model=config["quick_think_llm"],
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search Social Media for {query} from {start_date} to {end_date}? Make sure you only get the data posted during that period.",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text
def get_global_news_openai(curr_date, look_back_days=7, limit=5):
config = get_config()
client = OpenAI(base_url=config["backend_url"])
response = client.responses.create(
model=config["quick_think_llm"],
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search global or macroeconomics news from {look_back_days} days before {curr_date} to {curr_date} that would be informative for trading purposes? Make sure you only get the data posted during that period. Limit the results to {limit} articles.",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text
def get_fundamentals_openai(ticker, curr_date):
config = get_config()
client = OpenAI(base_url=config["backend_url"])
response = client.responses.create(
model=config["quick_think_llm"],
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search Fundamental for discussions on {ticker} during of the month before {curr_date} to the month of {curr_date}. Make sure you only get the data posted during that period. List as a table, with PE/PS/Cash flow/ etc",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text

View File

@ -3,7 +3,7 @@ import yfinance as yf
from stockstats import wrap from stockstats import wrap
from typing import Annotated from typing import Annotated
import os import os
from .config import get_config from .config import get_config, DATA_DIR
class StockstatsUtils: class StockstatsUtils:
@ -16,15 +16,11 @@ class StockstatsUtils:
curr_date: Annotated[ curr_date: Annotated[
str, "curr date for retrieving stock price data, YYYY-mm-dd" str, "curr date for retrieving stock price data, YYYY-mm-dd"
], ],
data_dir: Annotated[
str,
"directory where the stock data is stored.",
],
online: Annotated[
bool,
"whether to use online tools to fetch data or offline tools. If True, will use online tools.",
] = False,
): ):
# Get config and set up data directory path
config = get_config()
online = config["data_vendors"]["technical_indicators"] != "local"
df = None df = None
data = None data = None
@ -32,7 +28,7 @@ class StockstatsUtils:
try: try:
data = pd.read_csv( data = pd.read_csv(
os.path.join( os.path.join(
data_dir, DATA_DIR,
f"{symbol}-YFin-data-2015-01-01-2025-03-25.csv", f"{symbol}-YFin-data-2015-01-01-2025-03-25.csv",
) )
) )
@ -50,7 +46,6 @@ class StockstatsUtils:
end_date = end_date.strftime("%Y-%m-%d") end_date = end_date.strftime("%Y-%m-%d")
# Get config and ensure cache directory exists # Get config and ensure cache directory exists
config = get_config()
os.makedirs(config["data_cache_dir"], exist_ok=True) os.makedirs(config["data_cache_dir"], exist_ok=True)
data_file = os.path.join( data_file = os.path.join(

View File

@ -0,0 +1,298 @@
from typing import Annotated
from datetime import datetime
from dateutil.relativedelta import relativedelta
import yfinance as yf
import os
from .stockstats_utils import StockstatsUtils
def get_YFin_data_online(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
):
datetime.strptime(start_date, "%Y-%m-%d")
datetime.strptime(end_date, "%Y-%m-%d")
# Create ticker object
ticker = yf.Ticker(symbol.upper())
# Fetch historical data for the specified date range
data = ticker.history(start=start_date, end=end_date)
# Check if data is empty
if data.empty:
return (
f"No data found for symbol '{symbol}' between {start_date} and {end_date}"
)
# Remove timezone info from index for cleaner output
if data.index.tz is not None:
data.index = data.index.tz_localize(None)
# Round numerical values to 2 decimal places for cleaner display
numeric_columns = ["Open", "High", "Low", "Close", "Adj Close"]
for col in numeric_columns:
if col in data.columns:
data[col] = data[col].round(2)
# Convert DataFrame to CSV string
csv_string = data.to_csv()
# Add header information
header = f"# Stock data for {symbol.upper()} from {start_date} to {end_date}\n"
header += f"# Total records: {len(data)}\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
def get_stock_stats_indicators_window(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[str, "technical indicator to get the analysis and report of"],
curr_date: Annotated[
str, "The current trading date you are trading on, YYYY-mm-dd"
],
look_back_days: Annotated[int, "how many days to look back"],
) -> str:
best_ind_params = {
# Moving Averages
"close_50_sma": (
"50 SMA: A medium-term trend indicator. "
"Usage: Identify trend direction and serve as dynamic support/resistance. "
"Tips: It lags price; combine with faster indicators for timely signals."
),
"close_200_sma": (
"200 SMA: A long-term trend benchmark. "
"Usage: Confirm overall market trend and identify golden/death cross setups. "
"Tips: It reacts slowly; best for strategic trend confirmation rather than frequent trading entries."
),
"close_10_ema": (
"10 EMA: A responsive short-term average. "
"Usage: Capture quick shifts in momentum and potential entry points. "
"Tips: Prone to noise in choppy markets; use alongside longer averages for filtering false signals."
),
# MACD Related
"macd": (
"MACD: Computes momentum via differences of EMAs. "
"Usage: Look for crossovers and divergence as signals of trend changes. "
"Tips: Confirm with other indicators in low-volatility or sideways markets."
),
"macds": (
"MACD Signal: An EMA smoothing of the MACD line. "
"Usage: Use crossovers with the MACD line to trigger trades. "
"Tips: Should be part of a broader strategy to avoid false positives."
),
"macdh": (
"MACD Histogram: Shows the gap between the MACD line and its signal. "
"Usage: Visualize momentum strength and spot divergence early. "
"Tips: Can be volatile; complement with additional filters in fast-moving markets."
),
# Momentum Indicators
"rsi": (
"RSI: Measures momentum to flag overbought/oversold conditions. "
"Usage: Apply 70/30 thresholds and watch for divergence to signal reversals. "
"Tips: In strong trends, RSI may remain extreme; always cross-check with trend analysis."
),
# Volatility Indicators
"boll": (
"Bollinger Middle: A 20 SMA serving as the basis for Bollinger Bands. "
"Usage: Acts as a dynamic benchmark for price movement. "
"Tips: Combine with the upper and lower bands to effectively spot breakouts or reversals."
),
"boll_ub": (
"Bollinger Upper Band: Typically 2 standard deviations above the middle line. "
"Usage: Signals potential overbought conditions and breakout zones. "
"Tips: Confirm signals with other tools; prices may ride the band in strong trends."
),
"boll_lb": (
"Bollinger Lower Band: Typically 2 standard deviations below the middle line. "
"Usage: Indicates potential oversold conditions. "
"Tips: Use additional analysis to avoid false reversal signals."
),
"atr": (
"ATR: Averages true range to measure volatility. "
"Usage: Set stop-loss levels and adjust position sizes based on current market volatility. "
"Tips: It's a reactive measure, so use it as part of a broader risk management strategy."
),
# Volume-Based Indicators
"vwma": (
"VWMA: A moving average weighted by volume. "
"Usage: Confirm trends by integrating price action with volume data. "
"Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses."
),
"mfi": (
"MFI: The Money Flow Index is a momentum indicator that uses both price and volume to measure buying and selling pressure. "
"Usage: Identify overbought (>80) or oversold (<20) conditions and confirm the strength of trends or reversals. "
"Tips: Use alongside RSI or MACD to confirm signals; divergence between price and MFI can indicate potential reversals."
),
}
if indicator not in best_ind_params:
raise ValueError(
f"Indicator {indicator} is not supported. Please choose from: {list(best_ind_params.keys())}"
)
end_date = curr_date
curr_date_dt = datetime.strptime(curr_date, "%Y-%m-%d")
before = curr_date_dt - relativedelta(days=look_back_days)
# online gathering only
ind_string = ""
while curr_date_dt >= before:
indicator_value = get_stockstats_indicator(
symbol, indicator, curr_date_dt.strftime("%Y-%m-%d")
)
ind_string += f"{curr_date_dt.strftime('%Y-%m-%d')}: {indicator_value}\n"
curr_date_dt = curr_date_dt - relativedelta(days=1)
result_str = (
f"## {indicator} values from {before.strftime('%Y-%m-%d')} to {end_date}:\n\n"
+ ind_string
+ "\n\n"
+ best_ind_params.get(indicator, "No description available.")
)
return result_str
def get_stockstats_indicator(
symbol: Annotated[str, "ticker symbol of the company"],
indicator: Annotated[str, "technical indicator to get the analysis and report of"],
curr_date: Annotated[
str, "The current trading date you are trading on, YYYY-mm-dd"
],
) -> str:
curr_date_dt = datetime.strptime(curr_date, "%Y-%m-%d")
curr_date = curr_date_dt.strftime("%Y-%m-%d")
try:
indicator_value = StockstatsUtils.get_stock_stats(
symbol,
indicator,
curr_date,
)
except Exception as e:
print(
f"Error getting stockstats indicator data for indicator {indicator} on {curr_date}: {e}"
)
return ""
return str(indicator_value)
def get_balance_sheet(
ticker: Annotated[str, "ticker symbol of the company"],
freq: Annotated[str, "frequency of data: 'annual' or 'quarterly'"] = "quarterly",
curr_date: Annotated[str, "current date (not used for yfinance)"] = None
):
"""Get balance sheet data from yfinance."""
try:
ticker_obj = yf.Ticker(ticker.upper())
if freq.lower() == "quarterly":
data = ticker_obj.quarterly_balance_sheet
else:
data = ticker_obj.balance_sheet
if data.empty:
return f"No balance sheet data found for symbol '{ticker}'"
# Convert to CSV string for consistency with other functions
csv_string = data.to_csv()
# Add header information
header = f"# Balance Sheet data for {ticker.upper()} ({freq})\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
except Exception as e:
return f"Error retrieving balance sheet for {ticker}: {str(e)}"
def get_cashflow(
ticker: Annotated[str, "ticker symbol of the company"],
freq: Annotated[str, "frequency of data: 'annual' or 'quarterly'"] = "quarterly",
curr_date: Annotated[str, "current date (not used for yfinance)"] = None
):
"""Get cash flow data from yfinance."""
try:
ticker_obj = yf.Ticker(ticker.upper())
if freq.lower() == "quarterly":
data = ticker_obj.quarterly_cashflow
else:
data = ticker_obj.cashflow
if data.empty:
return f"No cash flow data found for symbol '{ticker}'"
# Convert to CSV string for consistency with other functions
csv_string = data.to_csv()
# Add header information
header = f"# Cash Flow data for {ticker.upper()} ({freq})\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
except Exception as e:
return f"Error retrieving cash flow for {ticker}: {str(e)}"
def get_income_statement(
ticker: Annotated[str, "ticker symbol of the company"],
freq: Annotated[str, "frequency of data: 'annual' or 'quarterly'"] = "quarterly",
curr_date: Annotated[str, "current date (not used for yfinance)"] = None
):
"""Get income statement data from yfinance."""
try:
ticker_obj = yf.Ticker(ticker.upper())
if freq.lower() == "quarterly":
data = ticker_obj.quarterly_income_stmt
else:
data = ticker_obj.income_stmt
if data.empty:
return f"No income statement data found for symbol '{ticker}'"
# Convert to CSV string for consistency with other functions
csv_string = data.to_csv()
# Add header information
header = f"# Income Statement data for {ticker.upper()} ({freq})\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
except Exception as e:
return f"Error retrieving income statement for {ticker}: {str(e)}"
def get_insider_transactions(
ticker: Annotated[str, "ticker symbol of the company"]
):
"""Get insider transactions data from yfinance."""
try:
ticker_obj = yf.Ticker(ticker.upper())
data = ticker_obj.insider_transactions
if data is None or data.empty:
return f"No insider transactions data found for symbol '{ticker}'"
# Convert to CSV string for consistency with other functions
csv_string = data.to_csv()
# Add header information
header = f"# Insider Transactions data for {ticker.upper()}\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
except Exception as e:
return f"Error retrieving insider transactions for {ticker}: {str(e)}"

View File

@ -17,6 +17,17 @@ DEFAULT_CONFIG = {
"max_debate_rounds": 1, "max_debate_rounds": 1,
"max_risk_discuss_rounds": 1, "max_risk_discuss_rounds": 1,
"max_recur_limit": 100, "max_recur_limit": 100,
# Tool settings # Data vendor configuration
"online_tools": True, # Category-level configuration (default for all tools in category)
"data_vendors": {
"core_stock_apis": "yfinance", # Options: yfinance, alpha_vantage, local
"technical_indicators": "yfinance", # Options: yfinance, alpha_vantage, local
"fundamental_data": "alpha_vantage", # Options: openai, alpha_vantage, local
"news_data": "alpha_vantage", # Options: openai, alpha_vantage, google, local
},
# Tool-level configuration (takes precedence over category-level)
"tool_vendors": {
# Example: "get_stock_data": "alpha_vantage", # Override category default
# Example: "get_news": "openai", # Override category default
},
} }

View File

@ -7,7 +7,6 @@ from langgraph.prebuilt import ToolNode
from tradingagents.agents import * from tradingagents.agents import *
from tradingagents.agents.utils.agent_states import AgentState from tradingagents.agents.utils.agent_states import AgentState
from tradingagents.agents.utils.agent_utils import Toolkit
from .conditional_logic import ConditionalLogic from .conditional_logic import ConditionalLogic
@ -19,7 +18,6 @@ class GraphSetup:
self, self,
quick_thinking_llm: ChatOpenAI, quick_thinking_llm: ChatOpenAI,
deep_thinking_llm: ChatOpenAI, deep_thinking_llm: ChatOpenAI,
toolkit: Toolkit,
tool_nodes: Dict[str, ToolNode], tool_nodes: Dict[str, ToolNode],
bull_memory, bull_memory,
bear_memory, bear_memory,
@ -31,7 +29,6 @@ class GraphSetup:
"""Initialize with required components.""" """Initialize with required components."""
self.quick_thinking_llm = quick_thinking_llm self.quick_thinking_llm = quick_thinking_llm
self.deep_thinking_llm = deep_thinking_llm self.deep_thinking_llm = deep_thinking_llm
self.toolkit = toolkit
self.tool_nodes = tool_nodes self.tool_nodes = tool_nodes
self.bull_memory = bull_memory self.bull_memory = bull_memory
self.bear_memory = bear_memory self.bear_memory = bear_memory
@ -62,28 +59,28 @@ class GraphSetup:
if "market" in selected_analysts: if "market" in selected_analysts:
analyst_nodes["market"] = create_market_analyst( analyst_nodes["market"] = create_market_analyst(
self.quick_thinking_llm, self.toolkit self.quick_thinking_llm
) )
delete_nodes["market"] = create_msg_delete() delete_nodes["market"] = create_msg_delete()
tool_nodes["market"] = self.tool_nodes["market"] tool_nodes["market"] = self.tool_nodes["market"]
if "social" in selected_analysts: if "social" in selected_analysts:
analyst_nodes["social"] = create_social_media_analyst( analyst_nodes["social"] = create_social_media_analyst(
self.quick_thinking_llm, self.toolkit self.quick_thinking_llm
) )
delete_nodes["social"] = create_msg_delete() delete_nodes["social"] = create_msg_delete()
tool_nodes["social"] = self.tool_nodes["social"] tool_nodes["social"] = self.tool_nodes["social"]
if "news" in selected_analysts: if "news" in selected_analysts:
analyst_nodes["news"] = create_news_analyst( analyst_nodes["news"] = create_news_analyst(
self.quick_thinking_llm, self.toolkit self.quick_thinking_llm
) )
delete_nodes["news"] = create_msg_delete() delete_nodes["news"] = create_msg_delete()
tool_nodes["news"] = self.tool_nodes["news"] tool_nodes["news"] = self.tool_nodes["news"]
if "fundamentals" in selected_analysts: if "fundamentals" in selected_analysts:
analyst_nodes["fundamentals"] = create_fundamentals_analyst( analyst_nodes["fundamentals"] = create_fundamentals_analyst(
self.quick_thinking_llm, self.toolkit self.quick_thinking_llm
) )
delete_nodes["fundamentals"] = create_msg_delete() delete_nodes["fundamentals"] = create_msg_delete()
tool_nodes["fundamentals"] = self.tool_nodes["fundamentals"] tool_nodes["fundamentals"] = self.tool_nodes["fundamentals"]

View File

@ -20,7 +20,21 @@ from tradingagents.agents.utils.agent_states import (
InvestDebateState, InvestDebateState,
RiskDebateState, RiskDebateState,
) )
from tradingagents.dataflows.interface import set_config from tradingagents.dataflows.config import set_config
# Import the new abstract tool methods from agent_utils
from tradingagents.agents.utils.agent_utils import (
get_stock_data,
get_indicators,
get_fundamentals,
get_balance_sheet,
get_cashflow,
get_income_statement,
get_news,
get_insider_sentiment,
get_insider_transactions,
get_global_news
)
from .conditional_logic import ConditionalLogic from .conditional_logic import ConditionalLogic
from .setup import GraphSetup from .setup import GraphSetup
@ -70,8 +84,6 @@ class TradingAgentsGraph:
else: else:
raise ValueError(f"Unsupported LLM provider: {self.config['llm_provider']}") raise ValueError(f"Unsupported LLM provider: {self.config['llm_provider']}")
self.toolkit = Toolkit(config=self.config)
# Initialize memories # Initialize memories
self.bull_memory = FinancialSituationMemory("bull_memory", self.config) self.bull_memory = FinancialSituationMemory("bull_memory", self.config)
self.bear_memory = FinancialSituationMemory("bear_memory", self.config) self.bear_memory = FinancialSituationMemory("bear_memory", self.config)
@ -87,7 +99,6 @@ class TradingAgentsGraph:
self.graph_setup = GraphSetup( self.graph_setup = GraphSetup(
self.quick_thinking_llm, self.quick_thinking_llm,
self.deep_thinking_llm, self.deep_thinking_llm,
self.toolkit,
self.tool_nodes, self.tool_nodes,
self.bull_memory, self.bull_memory,
self.bear_memory, self.bear_memory,
@ -110,46 +121,38 @@ class TradingAgentsGraph:
self.graph = self.graph_setup.setup_graph(selected_analysts) self.graph = self.graph_setup.setup_graph(selected_analysts)
def _create_tool_nodes(self) -> Dict[str, ToolNode]: def _create_tool_nodes(self) -> Dict[str, ToolNode]:
"""Create tool nodes for different data sources.""" """Create tool nodes for different data sources using abstract methods."""
return { return {
"market": ToolNode( "market": ToolNode(
[ [
# online tools # Core stock data tools
self.toolkit.get_YFin_data_online, get_stock_data,
self.toolkit.get_stockstats_indicators_report_online, # Technical indicators
# offline tools get_indicators,
self.toolkit.get_YFin_data,
self.toolkit.get_stockstats_indicators_report,
] ]
), ),
"social": ToolNode( "social": ToolNode(
[ [
# online tools # News tools for social media analysis
self.toolkit.get_stock_news_openai, get_news,
# offline tools
self.toolkit.get_reddit_stock_info,
] ]
), ),
"news": ToolNode( "news": ToolNode(
[ [
# online tools # News and insider information
self.toolkit.get_global_news_openai, get_news,
self.toolkit.get_google_news, get_global_news,
# offline tools get_insider_sentiment,
self.toolkit.get_finnhub_news, get_insider_transactions,
self.toolkit.get_reddit_news,
] ]
), ),
"fundamentals": ToolNode( "fundamentals": ToolNode(
[ [
# online tools # Fundamental analysis tools
self.toolkit.get_fundamentals_openai, get_fundamentals,
# offline tools get_balance_sheet,
self.toolkit.get_finnhub_company_insider_sentiment, get_cashflow,
self.toolkit.get_finnhub_company_insider_transactions, get_income_statement,
self.toolkit.get_simfin_balance_sheet,
self.toolkit.get_simfin_cashflow,
self.toolkit.get_simfin_income_stmt,
] ]
), ),
} }