diff --git a/tradingagents/dataflows/interface.py b/tradingagents/dataflows/interface.py index 4cd5ddef..b5057d19 100644 --- a/tradingagents/dataflows/interface.py +++ b/tradingagents/dataflows/interface.py @@ -16,6 +16,7 @@ from .alpha_vantage import ( get_news as get_alpha_vantage_news ) from .alpha_vantage_common import AlphaVantageRateLimitError +from .telegram import get_crypto_news_telegram # Configuration and routing logic from .config import get_config @@ -100,6 +101,7 @@ VENDOR_METHODS = { "alpha_vantage": get_alpha_vantage_news, "openai": get_stock_news_openai, "google": get_google_news, + "telegram": get_crypto_news_telegram, "local": [get_finnhub_news, get_reddit_company_news, get_google_news], }, "get_global_news": { diff --git a/tradingagents/dataflows/telegram.py b/tradingagents/dataflows/telegram.py new file mode 100644 index 00000000..7d277e5e --- /dev/null +++ b/tradingagents/dataflows/telegram.py @@ -0,0 +1,49 @@ +import asyncio +from telethon import TelegramClient +from datetime import datetime, timedelta, timezone +import os + +def get_api_credentials(): + api_id = int(os.getenv("TELEGRAM_API_ID", "")) + api_hash = os.getenv("TELEGRAM_API_HASH", "") + session_name = os.getenv("TELEGRAM_SESSION_NAME", "") + return api_id, api_hash, session_name + +async def _get_channel_history_async(start_date_str, end_date_str): + """ + The internal async logic that does the actual work. + """ + + username = "WatcherGuru" + + api_id, api_hash, session_name = get_api_credentials() + + # 1. Start the client using 'async with' + # This automatically handles connecting AND disconnecting (releasing the DB lock) + async with TelegramClient(session_name, api_id, api_hash) as client: + + # Date parsing logic + start_date = datetime.strptime(start_date_str, '%Y-%m-%d').replace(tzinfo=timezone.utc) + end_date_obj = datetime.strptime(end_date_str, '%Y-%m-%d').replace(tzinfo=timezone.utc) + end_date = end_date_obj + timedelta(days=1) - timedelta(seconds=1) + + formatted_log = "" + + # Fetching messages + n_records = 0 + async for message in client.iter_messages(username, offset_date=end_date, reverse=False): + if message.date < start_date: + break + + if message.text: + date_str = message.date.strftime('%Y-%m-%d') + clean_text = message.text.replace('\n', ' ') + formatted_log += f"[{date_str}] {clean_text}\n" + n_records += 1 + + intro = f"# News data from Telegram channel @{username} from {start_date_str} to {end_date_str}:\n# Total records: {n_records}\n# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + + return intro + formatted_log + +def get_crypto_news_telegram(symbol, start_date, end_date): + return asyncio.run(_get_channel_history_async(start_date, end_date)) \ No newline at end of file