feat: implement cash management sweep and auto-liquidation (#129)

- Added `CashSweep` node to `PortfolioGraphSetup` to automatically sweep idle cash above 5% threshold into SGOV ETF
- Updated `TradeExecutor` to support auto-liquidating SGOV holdings to fund new equity purchases when cash is insufficient
- Added `cash_sweep` tracking field to `PortfolioManagerState`

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com>
This commit is contained in:
ahmet guzererler 2026-03-27 11:34:33 +01:00 committed by GitHub
parent 635ec430b1
commit 9ccb22d073
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 111 additions and 1 deletions

View File

@ -192,6 +192,7 @@ class PortfolioGraph:
"macro_memory_context": "",
"micro_memory_context": "",
"pm_decision": "",
"cash_sweep": "",
"execution_result": "",
"sender": "",
}

View File

@ -188,6 +188,73 @@ class PortfolioGraphSetup:
return prioritize_candidates_node
def _make_cash_sweep_node(self):
"""Node to automatically sweep excess cash into a cash-equivalent ETF."""
def cash_sweep_node(state):
portfolio_data_str = state.get("portfolio_data") or "{}"
pm_decision_str = state.get("pm_decision") or "{}"
prices = state.get("prices") or {}
try:
portfolio_data = json.loads(portfolio_data_str)
from tradingagents.portfolio.models import Holding, Portfolio
portfolio = Portfolio.from_dict(portfolio_data.get("portfolio") or _EMPTY_PORTFOLIO_DICT)
holdings = [Holding.from_dict(h) for h in (portfolio_data.get("holdings") or [])]
if prices and portfolio.total_value is None:
equity = sum(prices.get(h.ticker, 0.0) * h.shares for h in holdings)
total_value = portfolio.cash + equity
for h in holdings:
if h.ticker in prices:
h.enrich(prices[h.ticker], total_value)
portfolio.enrich(holdings)
total_value = portfolio.total_value or portfolio.cash
# Default target cash threshold
target_cash_pct = 0.05
sweep_etf = "SGOV"
sweep_etf_price = prices.get(sweep_etf, 100.0) # Assume 100.0 if not in prices
try:
decisions = json.loads(pm_decision_str)
except (json.JSONDecodeError, TypeError):
decisions = {"sells": [], "buys": []}
if "buys" not in decisions:
decisions["buys"] = []
sweep_details = "No sweep needed"
if total_value > 0:
current_cash_pct = portfolio.cash / total_value
if current_cash_pct > target_cash_pct:
excess_cash = portfolio.cash - (total_value * target_cash_pct)
shares_to_buy = int(excess_cash / sweep_etf_price)
if shares_to_buy > 0:
# Add SGOV buy to decisions
sweep_buy = {
"ticker": sweep_etf,
"shares": float(shares_to_buy),
"sector": "Cash Equivalent",
"rationale": f"Automatic cash sweep of excess cash (${excess_cash:.2f}) to maintain {target_cash_pct*100:.1f}% target."
}
decisions["buys"].append(sweep_buy)
pm_decision_str = json.dumps(decisions)
sweep_details = f"Swept {shares_to_buy} shares of {sweep_etf}"
logger.info("CashSweep: %s", sweep_details)
except Exception as exc:
logger.error("cash_sweep_node: %s", exc)
sweep_details = f"Error: {exc}"
return {
"pm_decision": pm_decision_str,
"cash_sweep": sweep_details,
"sender": "cash_sweep",
}
return cash_sweep_node
def _make_execute_trades_node(self):
repo = self._repo
config = self._config
@ -266,8 +333,11 @@ class PortfolioGraphSetup:
workflow.add_edge("macro_summary", "make_pm_decision")
workflow.add_edge("micro_summary", "make_pm_decision")
workflow.add_node("cash_sweep", self._make_cash_sweep_node())
# Tail
workflow.add_edge("make_pm_decision", "execute_trades")
workflow.add_edge("make_pm_decision", "cash_sweep")
workflow.add_edge("cash_sweep", "execute_trades")
workflow.add_edge("execute_trades", END)
return workflow.compile()

View File

@ -45,6 +45,7 @@ class PortfolioManagerState(MessagesState):
micro_memory_context: Annotated[str, _last_value]
pm_decision: Annotated[str, _last_value]
cash_sweep: Annotated[str, _last_value]
execution_result: Annotated[str, _last_value]
sender: Annotated[str, _last_value]

View File

@ -174,6 +174,44 @@ class TradeExecutor:
})
continue
# Auto-liquidate cash-sweep ETF (SGOV) if cash is insufficient
cost = shares * price
if portfolio.cash < cost and ticker != "SGOV":
sgov_holding = next((h for h in holdings if h.ticker == "SGOV"), None)
if sgov_holding:
shortfall = cost - portfolio.cash
sgov_price = prices.get("SGOV")
if sgov_price and sgov_price > 0:
# Add a tiny buffer (1.01) to ensure we have enough to avoid precision issues
sgov_shares_to_sell = int((shortfall * 1.01) / sgov_price) + 1
# Don't sell more than we own
sgov_shares_to_sell = min(sgov_shares_to_sell, int(sgov_holding.shares))
if sgov_shares_to_sell > 0:
logger.info(
"TradeExecutor: Auto-liquidating %d shares of SGOV to cover shortfall for %s",
sgov_shares_to_sell, ticker
)
try:
executed, failed = self.repo.batch_remove_holdings(
portfolio_id,
[{
"ticker": "SGOV",
"shares": sgov_shares_to_sell,
"price": sgov_price,
"rationale": f"Auto-liquidated to fund {ticker} purchase"
}],
trade_date
)
executed_trades.extend(executed)
# Reload portfolio to reflect new cash balance
portfolio, holdings = self.repo.get_portfolio_with_holdings(
portfolio_id, prices
)
except PortfolioError as exc:
logger.error("TradeExecutor auto-liquidation failed: %s", exc)
violations = check_constraints(
portfolio,
holdings,