336 lines
12 KiB
Python
336 lines
12 KiB
Python
"""Filesystem document store for Portfolio Manager reports.
|
|
|
|
Saves and loads all non-transactional portfolio artifacts (scans, per-ticker
|
|
analysis, holding reviews, risk metrics, PM decisions) using the existing
|
|
``tradingagents/report_paths.py`` path convention.
|
|
|
|
Directory layout::
|
|
|
|
reports/daily/{date}/
|
|
├── market/
|
|
│ └── macro_scan_summary.json ← save_scan / load_scan
|
|
├── {TICKER}/
|
|
│ └── complete_report.json ← save_analysis / load_analysis
|
|
└── portfolio/
|
|
├── {TICKER}_holding_review.json ← save/load_holding_review
|
|
├── {portfolio_id}_risk_metrics.json
|
|
├── {portfolio_id}_pm_decision.json
|
|
└── {portfolio_id}_pm_decision.md
|
|
|
|
Usage::
|
|
|
|
from tradingagents.portfolio.report_store import ReportStore
|
|
|
|
store = ReportStore()
|
|
store.save_scan("2026-03-20", {"watchlist": [...]})
|
|
data = store.load_scan("2026-03-20")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from tradingagents.portfolio.exceptions import ReportStoreError
|
|
|
|
|
|
class ReportStore:
|
|
"""Filesystem document store for all portfolio-related reports.
|
|
|
|
Directories are created automatically on first write.
|
|
All load methods return ``None`` when the file does not exist.
|
|
"""
|
|
|
|
def __init__(self, base_dir: str | Path = "reports") -> None:
|
|
"""Initialise the store with a base reports directory.
|
|
|
|
Args:
|
|
base_dir: Root directory for all reports. Defaults to ``"reports"``
|
|
(relative to CWD), matching ``report_paths.REPORTS_ROOT``.
|
|
Override via the ``PORTFOLIO_DATA_DIR`` env var or
|
|
``get_portfolio_config()["data_dir"]``.
|
|
"""
|
|
self._base_dir = Path(base_dir)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _portfolio_dir(self, date: str) -> Path:
|
|
"""Return the portfolio subdirectory for a given date.
|
|
|
|
Path: ``{base_dir}/daily/{date}/portfolio/``
|
|
"""
|
|
return self._base_dir / "daily" / date / "portfolio"
|
|
|
|
@staticmethod
|
|
def _sanitize(obj: Any) -> Any:
|
|
"""Recursively convert non-JSON-serializable objects to safe types.
|
|
|
|
Handles LangChain message objects (``HumanMessage``, ``AIMessage``,
|
|
etc.) that appear in LangGraph state dicts, as well as any other
|
|
arbitrary objects that are not natively JSON-serializable.
|
|
"""
|
|
if obj is None or isinstance(obj, (bool, int, float, str)):
|
|
return obj
|
|
if isinstance(obj, dict):
|
|
return {k: ReportStore._sanitize(v) for k, v in obj.items()}
|
|
if isinstance(obj, (list, tuple)):
|
|
return [ReportStore._sanitize(item) for item in obj]
|
|
# LangChain BaseMessage objects expose .type and .content
|
|
if hasattr(obj, "type") and hasattr(obj, "content"):
|
|
try:
|
|
if hasattr(obj, "dict") and callable(obj.dict):
|
|
return ReportStore._sanitize(obj.dict())
|
|
except Exception:
|
|
pass
|
|
return {"type": str(obj.type), "content": str(obj.content)}
|
|
# Generic fallback: try a serialization probe first
|
|
try:
|
|
json.dumps(obj)
|
|
return obj
|
|
except (TypeError, ValueError):
|
|
return str(obj)
|
|
|
|
def _write_json(self, path: Path, data: dict[str, Any]) -> Path:
|
|
"""Write a dict to a JSON file, creating parent directories as needed.
|
|
|
|
Args:
|
|
path: Target file path.
|
|
data: Data to serialise.
|
|
|
|
Returns:
|
|
The path written.
|
|
|
|
Raises:
|
|
ReportStoreError: On filesystem write failure.
|
|
"""
|
|
try:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
sanitized = self._sanitize(data)
|
|
path.write_text(json.dumps(sanitized, indent=2), encoding="utf-8")
|
|
return path
|
|
except OSError as exc:
|
|
raise ReportStoreError(f"Failed to write {path}: {exc}") from exc
|
|
|
|
def _read_json(self, path: Path) -> dict[str, Any] | None:
|
|
"""Read a JSON file, returning None if the file does not exist.
|
|
|
|
Raises:
|
|
ReportStoreError: On JSON parse error (file exists but is corrupt).
|
|
"""
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
raise ReportStoreError(f"Corrupt JSON at {path}: {exc}") from exc
|
|
|
|
# ------------------------------------------------------------------
|
|
# Macro Scan
|
|
# ------------------------------------------------------------------
|
|
|
|
def save_scan(self, date: str, data: dict[str, Any]) -> Path:
|
|
"""Save macro scan summary JSON.
|
|
|
|
Path: ``{base_dir}/daily/{date}/market/macro_scan_summary.json``
|
|
|
|
Args:
|
|
date: ISO date string, e.g. ``"2026-03-20"``.
|
|
data: Scan output dict (typically the macro_scan_summary).
|
|
|
|
Returns:
|
|
Path of the written file.
|
|
"""
|
|
path = self._base_dir / "daily" / date / "market" / "macro_scan_summary.json"
|
|
return self._write_json(path, data)
|
|
|
|
def load_scan(self, date: str) -> dict[str, Any] | None:
|
|
"""Load macro scan summary. Returns None if the file does not exist."""
|
|
path = self._base_dir / "daily" / date / "market" / "macro_scan_summary.json"
|
|
return self._read_json(path)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Per-Ticker Analysis
|
|
# ------------------------------------------------------------------
|
|
|
|
def save_analysis(self, date: str, ticker: str, data: dict[str, Any]) -> Path:
|
|
"""Save per-ticker analysis report as JSON.
|
|
|
|
Path: ``{base_dir}/daily/{date}/{TICKER}/complete_report.json``
|
|
|
|
Args:
|
|
date: ISO date string.
|
|
ticker: Ticker symbol (stored as uppercase).
|
|
data: Analysis output dict.
|
|
"""
|
|
path = self._base_dir / "daily" / date / ticker.upper() / "complete_report.json"
|
|
return self._write_json(path, data)
|
|
|
|
def load_analysis(self, date: str, ticker: str) -> dict[str, Any] | None:
|
|
"""Load per-ticker analysis JSON. Returns None if the file does not exist."""
|
|
path = self._base_dir / "daily" / date / ticker.upper() / "complete_report.json"
|
|
return self._read_json(path)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Holding Reviews
|
|
# ------------------------------------------------------------------
|
|
|
|
def save_holding_review(
|
|
self,
|
|
date: str,
|
|
ticker: str,
|
|
data: dict[str, Any],
|
|
) -> Path:
|
|
"""Save holding reviewer output for one ticker.
|
|
|
|
Path: ``{base_dir}/daily/{date}/portfolio/{TICKER}_holding_review.json``
|
|
|
|
Args:
|
|
date: ISO date string.
|
|
ticker: Ticker symbol (stored as uppercase).
|
|
data: HoldingReviewerAgent output dict.
|
|
"""
|
|
path = self._portfolio_dir(date) / f"{ticker.upper()}_holding_review.json"
|
|
return self._write_json(path, data)
|
|
|
|
def load_holding_review(self, date: str, ticker: str) -> dict[str, Any] | None:
|
|
"""Load holding review output. Returns None if the file does not exist."""
|
|
path = self._portfolio_dir(date) / f"{ticker.upper()}_holding_review.json"
|
|
return self._read_json(path)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Risk Metrics
|
|
# ------------------------------------------------------------------
|
|
|
|
def save_risk_metrics(
|
|
self,
|
|
date: str,
|
|
portfolio_id: str,
|
|
data: dict[str, Any],
|
|
) -> Path:
|
|
"""Save risk computation results.
|
|
|
|
Path: ``{base_dir}/daily/{date}/portfolio/{portfolio_id}_risk_metrics.json``
|
|
|
|
Args:
|
|
date: ISO date string.
|
|
portfolio_id: UUID of the target portfolio.
|
|
data: Risk metrics dict (Sharpe, Sortino, VaR, etc.).
|
|
"""
|
|
path = self._portfolio_dir(date) / f"{portfolio_id}_risk_metrics.json"
|
|
return self._write_json(path, data)
|
|
|
|
def load_risk_metrics(
|
|
self,
|
|
date: str,
|
|
portfolio_id: str,
|
|
) -> dict[str, Any] | None:
|
|
"""Load risk metrics. Returns None if the file does not exist."""
|
|
path = self._portfolio_dir(date) / f"{portfolio_id}_risk_metrics.json"
|
|
return self._read_json(path)
|
|
|
|
# ------------------------------------------------------------------
|
|
# PM Decisions
|
|
# ------------------------------------------------------------------
|
|
|
|
def save_pm_decision(
|
|
self,
|
|
date: str,
|
|
portfolio_id: str,
|
|
data: dict[str, Any],
|
|
markdown: str | None = None,
|
|
) -> Path:
|
|
"""Save PM agent decision.
|
|
|
|
JSON path: ``{base_dir}/daily/{date}/portfolio/{portfolio_id}_pm_decision.json``
|
|
MD path: ``{base_dir}/daily/{date}/portfolio/{portfolio_id}_pm_decision.md``
|
|
(written only when ``markdown`` is not None)
|
|
|
|
Args:
|
|
date: ISO date string.
|
|
portfolio_id: UUID of the target portfolio.
|
|
data: PM decision dict (sells, buys, holds, rationale, …).
|
|
markdown: Optional human-readable version; written when provided.
|
|
|
|
Returns:
|
|
Path of the written JSON file.
|
|
"""
|
|
json_path = self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.json"
|
|
self._write_json(json_path, data)
|
|
if markdown is not None:
|
|
md_path = self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.md"
|
|
try:
|
|
md_path.write_text(markdown, encoding="utf-8")
|
|
except OSError as exc:
|
|
raise ReportStoreError(f"Failed to write {md_path}: {exc}") from exc
|
|
return json_path
|
|
|
|
def load_pm_decision(
|
|
self,
|
|
date: str,
|
|
portfolio_id: str,
|
|
) -> dict[str, Any] | None:
|
|
"""Load PM decision JSON. Returns None if the file does not exist."""
|
|
path = self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.json"
|
|
return self._read_json(path)
|
|
|
|
def save_execution_result(
|
|
self,
|
|
date: str,
|
|
portfolio_id: str,
|
|
data: dict[str, Any],
|
|
) -> Path:
|
|
"""Save trade execution results.
|
|
|
|
Path: ``{base_dir}/daily/{date}/portfolio/{portfolio_id}_execution_result.json``
|
|
|
|
Args:
|
|
date: ISO date string.
|
|
portfolio_id: UUID of the target portfolio.
|
|
data: TradeExecutor output dict.
|
|
"""
|
|
path = self._portfolio_dir(date) / f"{portfolio_id}_execution_result.json"
|
|
return self._write_json(path, data)
|
|
|
|
def load_execution_result(
|
|
self,
|
|
date: str,
|
|
portfolio_id: str,
|
|
) -> dict[str, Any] | None:
|
|
"""Load execution result. Returns None if the file does not exist."""
|
|
path = self._portfolio_dir(date) / f"{portfolio_id}_execution_result.json"
|
|
return self._read_json(path)
|
|
|
|
def clear_portfolio_stage(self, date: str, portfolio_id: str) -> list[str]:
|
|
"""Delete PM decision and execution result files for a given date/portfolio.
|
|
|
|
Returns a list of deleted file names so the caller can log what was removed.
|
|
"""
|
|
targets = [
|
|
self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.json",
|
|
self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.md",
|
|
self._portfolio_dir(date) / f"{portfolio_id}_execution_result.json",
|
|
]
|
|
deleted = []
|
|
for path in targets:
|
|
if path.exists():
|
|
path.unlink()
|
|
deleted.append(path.name)
|
|
return deleted
|
|
|
|
def list_pm_decisions(self, portfolio_id: str) -> list[Path]:
|
|
"""Return all saved PM decision JSON paths for portfolio_id, newest first.
|
|
|
|
Scans ``{base_dir}/daily/*/portfolio/{portfolio_id}_pm_decision.json``.
|
|
|
|
Args:
|
|
portfolio_id: UUID of the target portfolio.
|
|
|
|
Returns:
|
|
Sorted list of Path objects, newest date first.
|
|
"""
|
|
pattern = f"daily/*/portfolio/{portfolio_id}_pm_decision.json"
|
|
return sorted(self._base_dir.glob(pattern), reverse=True)
|