TradingAgents/tradingagents/portfolio/report_store.py

336 lines
12 KiB
Python

"""Filesystem document store for Portfolio Manager reports.
Saves and loads all non-transactional portfolio artifacts (scans, per-ticker
analysis, holding reviews, risk metrics, PM decisions) using the existing
``tradingagents/report_paths.py`` path convention.
Directory layout::
reports/daily/{date}/
├── market/
│ └── macro_scan_summary.json ← save_scan / load_scan
├── {TICKER}/
│ └── complete_report.json ← save_analysis / load_analysis
└── portfolio/
├── {TICKER}_holding_review.json ← save/load_holding_review
├── {portfolio_id}_risk_metrics.json
├── {portfolio_id}_pm_decision.json
└── {portfolio_id}_pm_decision.md
Usage::
from tradingagents.portfolio.report_store import ReportStore
store = ReportStore()
store.save_scan("2026-03-20", {"watchlist": [...]})
data = store.load_scan("2026-03-20")
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from tradingagents.portfolio.exceptions import ReportStoreError
class ReportStore:
"""Filesystem document store for all portfolio-related reports.
Directories are created automatically on first write.
All load methods return ``None`` when the file does not exist.
"""
def __init__(self, base_dir: str | Path = "reports") -> None:
"""Initialise the store with a base reports directory.
Args:
base_dir: Root directory for all reports. Defaults to ``"reports"``
(relative to CWD), matching ``report_paths.REPORTS_ROOT``.
Override via the ``PORTFOLIO_DATA_DIR`` env var or
``get_portfolio_config()["data_dir"]``.
"""
self._base_dir = Path(base_dir)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _portfolio_dir(self, date: str) -> Path:
"""Return the portfolio subdirectory for a given date.
Path: ``{base_dir}/daily/{date}/portfolio/``
"""
return self._base_dir / "daily" / date / "portfolio"
@staticmethod
def _sanitize(obj: Any) -> Any:
"""Recursively convert non-JSON-serializable objects to safe types.
Handles LangChain message objects (``HumanMessage``, ``AIMessage``,
etc.) that appear in LangGraph state dicts, as well as any other
arbitrary objects that are not natively JSON-serializable.
"""
if obj is None or isinstance(obj, (bool, int, float, str)):
return obj
if isinstance(obj, dict):
return {k: ReportStore._sanitize(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [ReportStore._sanitize(item) for item in obj]
# LangChain BaseMessage objects expose .type and .content
if hasattr(obj, "type") and hasattr(obj, "content"):
try:
if hasattr(obj, "dict") and callable(obj.dict):
return ReportStore._sanitize(obj.dict())
except Exception:
pass
return {"type": str(obj.type), "content": str(obj.content)}
# Generic fallback: try a serialization probe first
try:
json.dumps(obj)
return obj
except (TypeError, ValueError):
return str(obj)
def _write_json(self, path: Path, data: dict[str, Any]) -> Path:
"""Write a dict to a JSON file, creating parent directories as needed.
Args:
path: Target file path.
data: Data to serialise.
Returns:
The path written.
Raises:
ReportStoreError: On filesystem write failure.
"""
try:
path.parent.mkdir(parents=True, exist_ok=True)
sanitized = self._sanitize(data)
path.write_text(json.dumps(sanitized, indent=2), encoding="utf-8")
return path
except OSError as exc:
raise ReportStoreError(f"Failed to write {path}: {exc}") from exc
def _read_json(self, path: Path) -> dict[str, Any] | None:
"""Read a JSON file, returning None if the file does not exist.
Raises:
ReportStoreError: On JSON parse error (file exists but is corrupt).
"""
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise ReportStoreError(f"Corrupt JSON at {path}: {exc}") from exc
# ------------------------------------------------------------------
# Macro Scan
# ------------------------------------------------------------------
def save_scan(self, date: str, data: dict[str, Any]) -> Path:
"""Save macro scan summary JSON.
Path: ``{base_dir}/daily/{date}/market/macro_scan_summary.json``
Args:
date: ISO date string, e.g. ``"2026-03-20"``.
data: Scan output dict (typically the macro_scan_summary).
Returns:
Path of the written file.
"""
path = self._base_dir / "daily" / date / "market" / "macro_scan_summary.json"
return self._write_json(path, data)
def load_scan(self, date: str) -> dict[str, Any] | None:
"""Load macro scan summary. Returns None if the file does not exist."""
path = self._base_dir / "daily" / date / "market" / "macro_scan_summary.json"
return self._read_json(path)
# ------------------------------------------------------------------
# Per-Ticker Analysis
# ------------------------------------------------------------------
def save_analysis(self, date: str, ticker: str, data: dict[str, Any]) -> Path:
"""Save per-ticker analysis report as JSON.
Path: ``{base_dir}/daily/{date}/{TICKER}/complete_report.json``
Args:
date: ISO date string.
ticker: Ticker symbol (stored as uppercase).
data: Analysis output dict.
"""
path = self._base_dir / "daily" / date / ticker.upper() / "complete_report.json"
return self._write_json(path, data)
def load_analysis(self, date: str, ticker: str) -> dict[str, Any] | None:
"""Load per-ticker analysis JSON. Returns None if the file does not exist."""
path = self._base_dir / "daily" / date / ticker.upper() / "complete_report.json"
return self._read_json(path)
# ------------------------------------------------------------------
# Holding Reviews
# ------------------------------------------------------------------
def save_holding_review(
self,
date: str,
ticker: str,
data: dict[str, Any],
) -> Path:
"""Save holding reviewer output for one ticker.
Path: ``{base_dir}/daily/{date}/portfolio/{TICKER}_holding_review.json``
Args:
date: ISO date string.
ticker: Ticker symbol (stored as uppercase).
data: HoldingReviewerAgent output dict.
"""
path = self._portfolio_dir(date) / f"{ticker.upper()}_holding_review.json"
return self._write_json(path, data)
def load_holding_review(self, date: str, ticker: str) -> dict[str, Any] | None:
"""Load holding review output. Returns None if the file does not exist."""
path = self._portfolio_dir(date) / f"{ticker.upper()}_holding_review.json"
return self._read_json(path)
# ------------------------------------------------------------------
# Risk Metrics
# ------------------------------------------------------------------
def save_risk_metrics(
self,
date: str,
portfolio_id: str,
data: dict[str, Any],
) -> Path:
"""Save risk computation results.
Path: ``{base_dir}/daily/{date}/portfolio/{portfolio_id}_risk_metrics.json``
Args:
date: ISO date string.
portfolio_id: UUID of the target portfolio.
data: Risk metrics dict (Sharpe, Sortino, VaR, etc.).
"""
path = self._portfolio_dir(date) / f"{portfolio_id}_risk_metrics.json"
return self._write_json(path, data)
def load_risk_metrics(
self,
date: str,
portfolio_id: str,
) -> dict[str, Any] | None:
"""Load risk metrics. Returns None if the file does not exist."""
path = self._portfolio_dir(date) / f"{portfolio_id}_risk_metrics.json"
return self._read_json(path)
# ------------------------------------------------------------------
# PM Decisions
# ------------------------------------------------------------------
def save_pm_decision(
self,
date: str,
portfolio_id: str,
data: dict[str, Any],
markdown: str | None = None,
) -> Path:
"""Save PM agent decision.
JSON path: ``{base_dir}/daily/{date}/portfolio/{portfolio_id}_pm_decision.json``
MD path: ``{base_dir}/daily/{date}/portfolio/{portfolio_id}_pm_decision.md``
(written only when ``markdown`` is not None)
Args:
date: ISO date string.
portfolio_id: UUID of the target portfolio.
data: PM decision dict (sells, buys, holds, rationale, …).
markdown: Optional human-readable version; written when provided.
Returns:
Path of the written JSON file.
"""
json_path = self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.json"
self._write_json(json_path, data)
if markdown is not None:
md_path = self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.md"
try:
md_path.write_text(markdown, encoding="utf-8")
except OSError as exc:
raise ReportStoreError(f"Failed to write {md_path}: {exc}") from exc
return json_path
def load_pm_decision(
self,
date: str,
portfolio_id: str,
) -> dict[str, Any] | None:
"""Load PM decision JSON. Returns None if the file does not exist."""
path = self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.json"
return self._read_json(path)
def save_execution_result(
self,
date: str,
portfolio_id: str,
data: dict[str, Any],
) -> Path:
"""Save trade execution results.
Path: ``{base_dir}/daily/{date}/portfolio/{portfolio_id}_execution_result.json``
Args:
date: ISO date string.
portfolio_id: UUID of the target portfolio.
data: TradeExecutor output dict.
"""
path = self._portfolio_dir(date) / f"{portfolio_id}_execution_result.json"
return self._write_json(path, data)
def load_execution_result(
self,
date: str,
portfolio_id: str,
) -> dict[str, Any] | None:
"""Load execution result. Returns None if the file does not exist."""
path = self._portfolio_dir(date) / f"{portfolio_id}_execution_result.json"
return self._read_json(path)
def clear_portfolio_stage(self, date: str, portfolio_id: str) -> list[str]:
"""Delete PM decision and execution result files for a given date/portfolio.
Returns a list of deleted file names so the caller can log what was removed.
"""
targets = [
self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.json",
self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.md",
self._portfolio_dir(date) / f"{portfolio_id}_execution_result.json",
]
deleted = []
for path in targets:
if path.exists():
path.unlink()
deleted.append(path.name)
return deleted
def list_pm_decisions(self, portfolio_id: str) -> list[Path]:
"""Return all saved PM decision JSON paths for portfolio_id, newest first.
Scans ``{base_dir}/daily/*/portfolio/{portfolio_id}_pm_decision.json``.
Args:
portfolio_id: UUID of the target portfolio.
Returns:
Sorted list of Path objects, newest date first.
"""
pattern = f"daily/*/portfolio/{portfolio_id}_pm_decision.json"
return sorted(self._base_dir.glob(pattern), reverse=True)