""" Algo Trading Workflow Complete workflow for screening stocks, detecting pump signals, and executing trades algorithmically with portfolio management and risk controls. Usage: from algo_trading_workflow import AlgoTradingBot bot = AlgoTradingBot( portfolio_cash=10000, paper_trading=True, ) # Run continuous trading loop bot.run() # Or run single iteration bot.run_iteration() """ import os import json import time import logging from datetime import datetime, timedelta from typing import Optional, Dict, List from dataclasses import asdict from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.strategy.portfolio_manager import PortfolioManager from tradingagents.strategy.exit_strategy import ExitStrategy, ExitConfig from tradingagents.strategy.trade_validator import TradeValidator from tradingagents.agents.trader.paper_trading import PaperTrader logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AlgoTradingBot: """ Algorithmic Trading Bot Orchestrates: 1. Stock screening (find candidates) 2. Signal detection (identify opportunities) 3. Position management (8% rule, max positions) 4. Trade execution (Webull paper trading) 5. Exit management (profit targets, stop losses) """ def __init__( self, portfolio_cash: float = 10000.0, paper_trading: bool = True, selected_analysts: Optional[List[str]] = None, webull_email: Optional[str] = None, webull_password: Optional[str] = None, webull_pin: Optional[str] = None, ): """ Initialize algo trading bot. Args: portfolio_cash: Starting capital paper_trading: Use paper trading (True) or simulation (False) selected_analysts: Which analysts to use (market, social, news, fundamentals) webull_email: Webull account email webull_password: Webull account password webull_pin: Webull trading PIN """ self.portfolio_cash = portfolio_cash self.paper_trading = paper_trading self.selected_analysts = selected_analysts or ["market", "social"] # Initialize components self.graph = TradingAgentsGraph( include_screening=True, include_pump_detection=True, selected_analysts=self.selected_analysts, ) self.portfolio_manager = PortfolioManager( portfolio_cash=portfolio_cash, max_position_pct=0.08, # 8% per stock max_risky_pct=0.25, # 25% in risky trades max_positions=10, ) self.exit_strategy = ExitStrategy( ExitConfig( profit_target_pct=5.0, # Take profit at 5% stop_loss_pct=2.0, # Stop loss at 2% max_hold_days=5, # Max 5 days trailing_stop_pct=2.0, # Trailing stop 2% ) ) # Paper trading self.paper_trader = None if paper_trading: self.paper_trader = PaperTrader( email=webull_email, password=webull_password, is_paper=True, ) if webull_email and webull_password: self._setup_paper_trading(webull_pin) self.validator = TradeValidator() # Tracking self.iteration_count = 0 self.last_iteration = None self.trade_log: List[Dict] = [] logger.info(f"AlgoTradingBot initialized with ${portfolio_cash:.2f}") def _setup_paper_trading(self, pin: Optional[str] = None): """Setup Webull paper trading connection""" if not self.paper_trader: return False if not self.paper_trader.login(): logger.warning("Failed to login to Webull") return False if pin and not self.paper_trader.get_trade_token(pin): logger.warning("Failed to get trade token") return False logger.info("Paper trading ready") return True def run_iteration(self): """ Run one complete trading iteration: 1. Analyze stock 2. Check for buy signals 3. Execute buy if signal strong 4. Check existing positions for exits 5. Execute sells if needed """ self.iteration_count += 1 iteration_start = datetime.now() logger.info(f"\n{'='*60}") logger.info(f"ITERATION {self.iteration_count} - {iteration_start}") logger.info(f"{'='*60}") # Get portfolio status portfolio_status = self.portfolio_manager.get_portfolio_status() logger.info(f"Portfolio: ${portfolio_status['total_value']:.2f} " f"(Cash: ${portfolio_status['cash']:.2f})") # TODO: Implement stock selection logic # For now, demonstrate with a hardcoded example stocks_to_analyze = ["NVDA", "AAPL", "TSLA"] # From screening agent for ticker in stocks_to_analyze: logger.info(f"\nAnalyzing {ticker}...") try: # Run graph analysis final_state, signal = self.graph.propagate( ticker, datetime.now().strftime("%Y-%m-%d") ) # Check for pump detection signal pump_report = final_state.get("pump_report", "") screening_report = final_state.get("screening_report", "") # Parse signal score from reports pump_score = self._extract_pump_score(pump_report) logger.info(f" Pump Score: {pump_score:.1f}") # Check if we should buy if pump_score > 70: # Threshold for entry self._attempt_buy(ticker, pump_score) except Exception as e: logger.error(f"Error analyzing {ticker}: {e}") # Check existing positions for exit signals self._check_exit_conditions() # Log iteration self.last_iteration = datetime.now() iteration_duration = (self.last_iteration - iteration_start).total_seconds() logger.info(f"\nIteration completed in {iteration_duration:.1f}s") def _attempt_buy(self, ticker: str, signal_score: float): """Attempt to buy a stock""" logger.info(f" Buy signal for {ticker} (score: {signal_score:.1f})") # Get current price if self.paper_trader: quote = self.paper_trader.get_stock_quote(ticker) if not quote: logger.warning(f" Could not get quote for {ticker}") return current_price = quote["price"] else: # Simulated price current_price = 100.0 # Demo value # Calculate position size position_size = self.portfolio_manager.calculate_position_size( current_price=current_price, signal_score=signal_score, position_type="pump", ) if not position_size: logger.info(f" Could not calculate position size") return shares = position_size["shares"] # Validate order validation = self.validator.validate_buy_order( ticker=ticker, shares=shares, price=current_price, available_cash=self.portfolio_manager.cash, portfolio_value=self.portfolio_manager.portfolio_value, ) if not validation["is_valid"]: logger.warning(f" Order validation failed: {validation['issues']}") return # Place order if self.paper_trader: order = self.paper_trader.place_buy_order( ticker=ticker, quantity=shares, limit_price=current_price, ) if not order: logger.error(f" Failed to place buy order") return # Add to portfolio if self.portfolio_manager.add_position( ticker=ticker, shares=shares, entry_price=current_price, signal_score=signal_score, position_type="pump", ): logger.info(f" ✓ Bought {shares} shares of {ticker} @ ${current_price:.2f}") self.trade_log.append({ "action": "BUY", "ticker": ticker, "shares": shares, "price": current_price, "timestamp": datetime.now().isoformat(), "signal_score": signal_score, }) else: logger.error(f" Failed to add position to portfolio") def _check_exit_conditions(self): """Check all positions for exit signals""" positions_to_check = list(self.portfolio_manager.positions.keys()) for ticker in positions_to_check: position = self.portfolio_manager.positions[ticker] # Get current price if self.paper_trader: quote = self.paper_trader.get_stock_quote(ticker) if not quote: continue current_price = quote["price"] else: current_price = 100.0 # Demo value # Check exit conditions exit_signal = self.exit_strategy.evaluate_exit( ticker=ticker, current_price=current_price, entry_price=position.entry_price, entry_date=position.entry_date, signal_score=position.signal_score, position_type=position.position_type, ) if exit_signal and exit_signal.get("exit_signal"): self._attempt_sell( ticker=ticker, exit_price=current_price, reason=exit_signal["reason"], ) def _attempt_sell(self, ticker: str, exit_price: float, reason: str): """Attempt to sell a position""" logger.info(f" Sell signal for {ticker} ({reason})") position = self.portfolio_manager.positions.get(ticker) if not position: logger.warning(f" No position found for {ticker}") return shares = position.shares # Validate order validation = self.validator.validate_sell_order( ticker=ticker, shares=shares, price=exit_price, position_shares=position.shares, position_value=position.shares * position.entry_price, ) if not validation["is_valid"]: logger.warning(f" Order validation failed: {validation['issues']}") return # Place order if self.paper_trader: order = self.paper_trader.place_sell_order( ticker=ticker, quantity=shares, limit_price=exit_price, ) if not order: logger.error(f" Failed to place sell order") return # Close position result = self.portfolio_manager.close_position( ticker=ticker, exit_price=exit_price, reason=reason, ) if result: logger.info( f" ✓ Sold {shares} shares of {ticker} @ ${exit_price:.2f} " f"(P/L: ${result['profit']:.2f}, {result['profit_pct']:.1f}%)" ) self.trade_log.append({ "action": "SELL", "ticker": ticker, "shares": shares, "price": exit_price, "timestamp": datetime.now().isoformat(), "profit": result["profit"], "profit_pct": result["profit_pct"], "reason": reason, }) def _extract_pump_score(self, pump_report: str) -> float: """Extract pump score from pump report""" # Simple extraction - look for "score:" or percentage if not pump_report: return 0.0 import re # Look for number followed by % or "score" matches = re.findall(r'(\d+(?:\.\d+)?)\s*%?', pump_report) if matches: try: return float(matches[0]) except ValueError: pass return 0.0 def run(self, iterations: int = -1, interval_seconds: int = 300): """ Run trading loop continuously. Args: iterations: Number of iterations (-1 for infinite) interval_seconds: Seconds between iterations (default 5 min) """ iteration = 0 logger.info(f"Starting algo trading loop (interval: {interval_seconds}s)") try: while iterations < 0 or iteration < iterations: self.run_iteration() iteration += 1 if iterations < 0 or iteration < iterations: logger.info(f"Next iteration in {interval_seconds}s...") time.sleep(interval_seconds) except KeyboardInterrupt: logger.info("\nTrading loop interrupted by user") finally: self.save_state() def get_status(self) -> Dict: """Get current bot status""" portfolio_status = self.portfolio_manager.get_portfolio_status() return { "iteration": self.iteration_count, "last_iteration": self.last_iteration.isoformat() if self.last_iteration else None, "portfolio": portfolio_status, "trades": len(self.trade_log), "paper_trading": self.paper_trading, } def save_state(self, filepath: str = "trading_bot_state.json"): """Save bot state to file""" state = { "iteration": self.iteration_count, "timestamp": datetime.now().isoformat(), "portfolio": self.portfolio_manager.to_dict(), "exit_strategy": self.exit_strategy.to_dict(), "trades": self.trade_log, } with open(filepath, 'w') as f: json.dump(state, f, indent=2) logger.info(f"State saved to {filepath}") def print_summary(self): """Print trading summary""" portfolio_status = self.portfolio_manager.get_portfolio_status() print(f"\n{'='*60}") print(f"TRADING SUMMARY") print(f"{'='*60}") print(f"Iterations: {self.iteration_count}") print(f"Portfolio Value: ${portfolio_status['total_value']:.2f}") print(f"Cash: ${portfolio_status['cash']:.2f}") print(f"Positions: {portfolio_status['num_positions']}") print(f"Total Trades: {len(self.trade_log)}") # Calculate P/L pnl = 0.0 for trade in self.trade_log: if trade["action"] == "SELL" and "profit" in trade: pnl += trade["profit"] print(f"Net P/L: ${pnl:.2f}") print(f"Cash Utilization: {portfolio_status['cash_utilization']:.1f}%") print(f"Risky Exposure: {portfolio_status['risky_exposure']:.1f}%") print(f"{'='*60}\n") if __name__ == "__main__": # Example usage bot = AlgoTradingBot( portfolio_cash=10000.0, paper_trading=False, # Demo mode (no Webull auth) selected_analysts=["market"], ) # Run single iteration logger.info("Running demo iteration (not connected to Webull)...") bot.run_iteration() bot.print_summary()