475 lines
19 KiB
Python
475 lines
19 KiB
Python
from typing import Annotated
|
|
import pandas as pd
|
|
import os
|
|
from .config import DATA_DIR
|
|
from datetime import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
import json
|
|
from .reddit_utils import fetch_top_from_category
|
|
from tqdm import tqdm
|
|
|
|
def get_YFin_data_window(
|
|
symbol: Annotated[str, "ticker symbol of the company"],
|
|
curr_date: Annotated[str, "Start date in yyyy-mm-dd format"],
|
|
look_back_days: Annotated[int, "how many days to look back"],
|
|
) -> str:
|
|
# calculate past days
|
|
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
|
|
before = date_obj - relativedelta(days=look_back_days)
|
|
start_date = before.strftime("%Y-%m-%d")
|
|
|
|
# read in data
|
|
data = pd.read_csv(
|
|
os.path.join(
|
|
DATA_DIR,
|
|
f"market_data/price_data/{symbol}-YFin-data-2015-01-01-2025-03-25.csv",
|
|
)
|
|
)
|
|
|
|
# Extract just the date part for comparison
|
|
data["DateOnly"] = data["Date"].str[:10]
|
|
|
|
# Filter data between the start and end dates (inclusive)
|
|
filtered_data = data[
|
|
(data["DateOnly"] >= start_date) & (data["DateOnly"] <= curr_date)
|
|
]
|
|
|
|
# Drop the temporary column we created
|
|
filtered_data = filtered_data.drop("DateOnly", axis=1)
|
|
|
|
# Set pandas display options to show the full DataFrame
|
|
with pd.option_context(
|
|
"display.max_rows", None, "display.max_columns", None, "display.width", None
|
|
):
|
|
df_string = filtered_data.to_string()
|
|
|
|
return (
|
|
f"## Raw Market Data for {symbol} from {start_date} to {curr_date}:\n\n"
|
|
+ df_string
|
|
)
|
|
|
|
def get_YFin_data(
|
|
symbol: Annotated[str, "ticker symbol of the company"],
|
|
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
|
|
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
|
|
) -> str:
|
|
# read in data
|
|
data = pd.read_csv(
|
|
os.path.join(
|
|
DATA_DIR,
|
|
f"market_data/price_data/{symbol}-YFin-data-2015-01-01-2025-03-25.csv",
|
|
)
|
|
)
|
|
|
|
if end_date > "2025-03-25":
|
|
raise Exception(
|
|
f"Get_YFin_Data: {end_date} is outside of the data range of 2015-01-01 to 2025-03-25"
|
|
)
|
|
|
|
# Extract just the date part for comparison
|
|
data["DateOnly"] = data["Date"].str[:10]
|
|
|
|
# Filter data between the start and end dates (inclusive)
|
|
filtered_data = data[
|
|
(data["DateOnly"] >= start_date) & (data["DateOnly"] <= end_date)
|
|
]
|
|
|
|
# Drop the temporary column we created
|
|
filtered_data = filtered_data.drop("DateOnly", axis=1)
|
|
|
|
# remove the index from the dataframe
|
|
filtered_data = filtered_data.reset_index(drop=True)
|
|
|
|
return filtered_data
|
|
|
|
def get_finnhub_news(
|
|
query: Annotated[str, "Search query or ticker symbol"],
|
|
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
|
|
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
|
|
):
|
|
"""
|
|
Retrieve news about a company within a time frame
|
|
|
|
Args
|
|
query (str): Search query or ticker symbol
|
|
start_date (str): Start date in yyyy-mm-dd format
|
|
end_date (str): End date in yyyy-mm-dd format
|
|
Returns
|
|
str: dataframe containing the news of the company in the time frame
|
|
|
|
"""
|
|
|
|
result = get_data_in_range(query, start_date, end_date, "news_data", DATA_DIR)
|
|
|
|
if len(result) == 0:
|
|
return ""
|
|
|
|
combined_result = ""
|
|
for day, data in result.items():
|
|
if len(data) == 0:
|
|
continue
|
|
for entry in data:
|
|
current_news = (
|
|
"### " + entry["headline"] + f" ({day})" + "\n" + entry["summary"]
|
|
)
|
|
combined_result += current_news + "\n\n"
|
|
|
|
return f"## {query} News, from {start_date} to {end_date}:\n" + str(combined_result)
|
|
|
|
|
|
def get_finnhub_company_insider_sentiment(
|
|
ticker: Annotated[str, "ticker symbol for the company"],
|
|
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
|
|
):
|
|
"""
|
|
Retrieve insider sentiment about a company (retrieved from public SEC information) for the past 15 days
|
|
Args:
|
|
ticker (str): ticker symbol of the company
|
|
curr_date (str): current date you are trading on, yyyy-mm-dd
|
|
Returns:
|
|
str: a report of the sentiment in the past 15 days starting at curr_date
|
|
"""
|
|
|
|
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
|
|
before = date_obj - relativedelta(days=15) # Default 15 days lookback
|
|
before = before.strftime("%Y-%m-%d")
|
|
|
|
data = get_data_in_range(ticker, before, curr_date, "insider_senti", DATA_DIR)
|
|
|
|
if len(data) == 0:
|
|
return ""
|
|
|
|
result_str = ""
|
|
seen_dicts = []
|
|
for date, senti_list in data.items():
|
|
for entry in senti_list:
|
|
if entry not in seen_dicts:
|
|
result_str += f"### {entry['year']}-{entry['month']}:\nChange: {entry['change']}\nMonthly Share Purchase Ratio: {entry['mspr']}\n\n"
|
|
seen_dicts.append(entry)
|
|
|
|
return (
|
|
f"## {ticker} Insider Sentiment Data for {before} to {curr_date}:\n"
|
|
+ result_str
|
|
+ "The change field refers to the net buying/selling from all insiders' transactions. The mspr field refers to monthly share purchase ratio."
|
|
)
|
|
|
|
|
|
def get_finnhub_company_insider_transactions(
|
|
ticker: Annotated[str, "ticker symbol"],
|
|
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
|
|
):
|
|
"""
|
|
Retrieve insider transcaction information about a company (retrieved from public SEC information) for the past 15 days
|
|
Args:
|
|
ticker (str): ticker symbol of the company
|
|
curr_date (str): current date you are trading at, yyyy-mm-dd
|
|
Returns:
|
|
str: a report of the company's insider transaction/trading informtaion in the past 15 days
|
|
"""
|
|
|
|
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
|
|
before = date_obj - relativedelta(days=15) # Default 15 days lookback
|
|
before = before.strftime("%Y-%m-%d")
|
|
|
|
data = get_data_in_range(ticker, before, curr_date, "insider_trans", DATA_DIR)
|
|
|
|
if len(data) == 0:
|
|
return ""
|
|
|
|
result_str = ""
|
|
|
|
seen_dicts = []
|
|
for date, senti_list in data.items():
|
|
for entry in senti_list:
|
|
if entry not in seen_dicts:
|
|
result_str += f"### Filing Date: {entry['filingDate']}, {entry['name']}:\nChange:{entry['change']}\nShares: {entry['share']}\nTransaction Price: {entry['transactionPrice']}\nTransaction Code: {entry['transactionCode']}\n\n"
|
|
seen_dicts.append(entry)
|
|
|
|
return (
|
|
f"## {ticker} insider transactions from {before} to {curr_date}:\n"
|
|
+ result_str
|
|
+ "The change field reflects the variation in share count—here a negative number indicates a reduction in holdings—while share specifies the total number of shares involved. The transactionPrice denotes the per-share price at which the trade was executed, and transactionDate marks when the transaction occurred. The name field identifies the insider making the trade, and transactionCode (e.g., S for sale) clarifies the nature of the transaction. FilingDate records when the transaction was officially reported, and the unique id links to the specific SEC filing, as indicated by the source. Additionally, the symbol ties the transaction to a particular company, isDerivative flags whether the trade involves derivative securities, and currency notes the currency context of the transaction."
|
|
)
|
|
|
|
def get_data_in_range(ticker, start_date, end_date, data_type, data_dir, period=None):
|
|
"""
|
|
Gets finnhub data saved and processed on disk.
|
|
Args:
|
|
start_date (str): Start date in YYYY-MM-DD format.
|
|
end_date (str): End date in YYYY-MM-DD format.
|
|
data_type (str): Type of data from finnhub to fetch. Can be insider_trans, SEC_filings, news_data, insider_senti, or fin_as_reported.
|
|
data_dir (str): Directory where the data is saved.
|
|
period (str): Default to none, if there is a period specified, should be annual or quarterly.
|
|
"""
|
|
|
|
if period:
|
|
data_path = os.path.join(
|
|
data_dir,
|
|
"finnhub_data",
|
|
data_type,
|
|
f"{ticker}_{period}_data_formatted.json",
|
|
)
|
|
else:
|
|
data_path = os.path.join(
|
|
data_dir, "finnhub_data", data_type, f"{ticker}_data_formatted.json"
|
|
)
|
|
|
|
data = open(data_path, "r")
|
|
data = json.load(data)
|
|
|
|
# filter keys (date, str in format YYYY-MM-DD) by the date range (str, str in format YYYY-MM-DD)
|
|
filtered_data = {}
|
|
for key, value in data.items():
|
|
if start_date <= key <= end_date and len(value) > 0:
|
|
filtered_data[key] = value
|
|
return filtered_data
|
|
|
|
def get_simfin_balance_sheet(
|
|
ticker: Annotated[str, "ticker symbol"],
|
|
freq: Annotated[
|
|
str,
|
|
"reporting frequency of the company's financial history: annual / quarterly",
|
|
],
|
|
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
|
|
):
|
|
data_path = os.path.join(
|
|
DATA_DIR,
|
|
"fundamental_data",
|
|
"simfin_data_all",
|
|
"balance_sheet",
|
|
"companies",
|
|
"us",
|
|
f"us-balance-{freq}.csv",
|
|
)
|
|
df = pd.read_csv(data_path, sep=";")
|
|
|
|
# Convert date strings to datetime objects and remove any time components
|
|
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
|
|
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
|
|
|
|
# Convert the current date to datetime and normalize
|
|
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
|
|
|
|
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
|
|
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
|
|
|
|
# Check if there are any available reports; if not, return a notification
|
|
if filtered_df.empty:
|
|
print("No balance sheet available before the given current date.")
|
|
return ""
|
|
|
|
# Get the most recent balance sheet by selecting the row with the latest Publish Date
|
|
latest_balance_sheet = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
|
|
|
|
# drop the SimFinID column
|
|
latest_balance_sheet = latest_balance_sheet.drop("SimFinId")
|
|
|
|
return (
|
|
f"## {freq} balance sheet for {ticker} released on {str(latest_balance_sheet['Publish Date'])[0:10]}: \n"
|
|
+ str(latest_balance_sheet)
|
|
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a breakdown of assets, liabilities, and equity. Assets are grouped as current (liquid items like cash and receivables) and noncurrent (long-term investments and property). Liabilities are split between short-term obligations and long-term debts, while equity reflects shareholder funds such as paid-in capital and retained earnings. Together, these components ensure that total assets equal the sum of liabilities and equity."
|
|
)
|
|
|
|
|
|
def get_simfin_cashflow(
|
|
ticker: Annotated[str, "ticker symbol"],
|
|
freq: Annotated[
|
|
str,
|
|
"reporting frequency of the company's financial history: annual / quarterly",
|
|
],
|
|
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
|
|
):
|
|
data_path = os.path.join(
|
|
DATA_DIR,
|
|
"fundamental_data",
|
|
"simfin_data_all",
|
|
"cash_flow",
|
|
"companies",
|
|
"us",
|
|
f"us-cashflow-{freq}.csv",
|
|
)
|
|
df = pd.read_csv(data_path, sep=";")
|
|
|
|
# Convert date strings to datetime objects and remove any time components
|
|
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
|
|
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
|
|
|
|
# Convert the current date to datetime and normalize
|
|
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
|
|
|
|
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
|
|
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
|
|
|
|
# Check if there are any available reports; if not, return a notification
|
|
if filtered_df.empty:
|
|
print("No cash flow statement available before the given current date.")
|
|
return ""
|
|
|
|
# Get the most recent cash flow statement by selecting the row with the latest Publish Date
|
|
latest_cash_flow = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
|
|
|
|
# drop the SimFinID column
|
|
latest_cash_flow = latest_cash_flow.drop("SimFinId")
|
|
|
|
return (
|
|
f"## {freq} cash flow statement for {ticker} released on {str(latest_cash_flow['Publish Date'])[0:10]}: \n"
|
|
+ str(latest_cash_flow)
|
|
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a breakdown of cash movements. Operating activities show cash generated from core business operations, including net income adjustments for non-cash items and working capital changes. Investing activities cover asset acquisitions/disposals and investments. Financing activities include debt transactions, equity issuances/repurchases, and dividend payments. The net change in cash represents the overall increase or decrease in the company's cash position during the reporting period."
|
|
)
|
|
|
|
|
|
def get_simfin_income_statements(
|
|
ticker: Annotated[str, "ticker symbol"],
|
|
freq: Annotated[
|
|
str,
|
|
"reporting frequency of the company's financial history: annual / quarterly",
|
|
],
|
|
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
|
|
):
|
|
data_path = os.path.join(
|
|
DATA_DIR,
|
|
"fundamental_data",
|
|
"simfin_data_all",
|
|
"income_statements",
|
|
"companies",
|
|
"us",
|
|
f"us-income-{freq}.csv",
|
|
)
|
|
df = pd.read_csv(data_path, sep=";")
|
|
|
|
# Convert date strings to datetime objects and remove any time components
|
|
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
|
|
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
|
|
|
|
# Convert the current date to datetime and normalize
|
|
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
|
|
|
|
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
|
|
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
|
|
|
|
# Check if there are any available reports; if not, return a notification
|
|
if filtered_df.empty:
|
|
print("No income statement available before the given current date.")
|
|
return ""
|
|
|
|
# Get the most recent income statement by selecting the row with the latest Publish Date
|
|
latest_income = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
|
|
|
|
# drop the SimFinID column
|
|
latest_income = latest_income.drop("SimFinId")
|
|
|
|
return (
|
|
f"## {freq} income statement for {ticker} released on {str(latest_income['Publish Date'])[0:10]}: \n"
|
|
+ str(latest_income)
|
|
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a comprehensive breakdown of the company's financial performance. Starting with Revenue, it shows Cost of Revenue and resulting Gross Profit. Operating Expenses are detailed, including SG&A, R&D, and Depreciation. The statement then shows Operating Income, followed by non-operating items and Interest Expense, leading to Pretax Income. After accounting for Income Tax and any Extraordinary items, it concludes with Net Income, representing the company's bottom-line profit or loss for the period."
|
|
)
|
|
|
|
|
|
def get_reddit_global_news(
|
|
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
|
|
look_back_days: Annotated[int, "Number of days to look back"] = 7,
|
|
limit: Annotated[int, "Maximum number of articles to return"] = 5,
|
|
) -> str:
|
|
"""
|
|
Retrieve the latest top reddit news
|
|
Args:
|
|
curr_date: Current date in yyyy-mm-dd format
|
|
look_back_days: Number of days to look back (default 7)
|
|
limit: Maximum number of articles to return (default 5)
|
|
Returns:
|
|
str: A formatted string containing the latest news articles posts on reddit
|
|
"""
|
|
|
|
curr_date_dt = datetime.strptime(curr_date, "%Y-%m-%d")
|
|
before = curr_date_dt - relativedelta(days=look_back_days)
|
|
before = before.strftime("%Y-%m-%d")
|
|
|
|
posts = []
|
|
# iterate from before to curr_date
|
|
curr_iter_date = datetime.strptime(before, "%Y-%m-%d")
|
|
|
|
total_iterations = (curr_date_dt - curr_iter_date).days + 1
|
|
pbar = tqdm(desc=f"Getting Global News on {curr_date}", total=total_iterations)
|
|
|
|
while curr_iter_date <= curr_date_dt:
|
|
curr_date_str = curr_iter_date.strftime("%Y-%m-%d")
|
|
fetch_result = fetch_top_from_category(
|
|
"global_news",
|
|
curr_date_str,
|
|
limit,
|
|
data_path=os.path.join(DATA_DIR, "reddit_data"),
|
|
)
|
|
posts.extend(fetch_result)
|
|
curr_iter_date += relativedelta(days=1)
|
|
pbar.update(1)
|
|
|
|
pbar.close()
|
|
|
|
if len(posts) == 0:
|
|
return ""
|
|
|
|
news_str = ""
|
|
for post in posts:
|
|
if post["content"] == "":
|
|
news_str += f"### {post['title']}\n\n"
|
|
else:
|
|
news_str += f"### {post['title']}\n\n{post['content']}\n\n"
|
|
|
|
return f"## Global News Reddit, from {before} to {curr_date}:\n{news_str}"
|
|
|
|
|
|
def get_reddit_company_news(
|
|
query: Annotated[str, "Search query or ticker symbol"],
|
|
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
|
|
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
|
|
) -> str:
|
|
"""
|
|
Retrieve the latest top reddit news
|
|
Args:
|
|
query: Search query or ticker symbol
|
|
start_date: Start date in yyyy-mm-dd format
|
|
end_date: End date in yyyy-mm-dd format
|
|
Returns:
|
|
str: A formatted string containing news articles posts on reddit
|
|
"""
|
|
|
|
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
|
|
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
|
|
|
|
posts = []
|
|
# iterate from start_date to end_date
|
|
curr_date = start_date_dt
|
|
|
|
total_iterations = (end_date_dt - curr_date).days + 1
|
|
pbar = tqdm(
|
|
desc=f"Getting Company News for {query} from {start_date} to {end_date}",
|
|
total=total_iterations,
|
|
)
|
|
|
|
while curr_date <= end_date_dt:
|
|
curr_date_str = curr_date.strftime("%Y-%m-%d")
|
|
fetch_result = fetch_top_from_category(
|
|
"company_news",
|
|
curr_date_str,
|
|
10, # max limit per day
|
|
query,
|
|
data_path=os.path.join(DATA_DIR, "reddit_data"),
|
|
)
|
|
posts.extend(fetch_result)
|
|
curr_date += relativedelta(days=1)
|
|
|
|
pbar.update(1)
|
|
|
|
pbar.close()
|
|
|
|
if len(posts) == 0:
|
|
return ""
|
|
|
|
news_str = ""
|
|
for post in posts:
|
|
if post["content"] == "":
|
|
news_str += f"### {post['title']}\n\n"
|
|
else:
|
|
news_str += f"### {post['title']}\n\n{post['content']}\n\n"
|
|
|
|
return f"##{query} News Reddit, from {start_date} to {end_date}:\n\n{news_str}" |