121 lines
4.0 KiB
Python
121 lines
4.0 KiB
Python
import json
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
_ALLOWED_RULE_FILENAMES = {"factor_rules.json"}
|
|
|
|
|
|
def _candidate_rule_paths(config: Optional[Dict[str, Any]] = None) -> List[Path]:
|
|
config = config or {}
|
|
candidates = []
|
|
|
|
project_dir = Path(config.get("project_dir", Path(__file__).resolve().parents[2])).resolve()
|
|
allowed_dirs = {
|
|
project_dir.resolve(),
|
|
(project_dir / "examples").resolve(),
|
|
}
|
|
|
|
explicit = config.get("factor_rules_path")
|
|
if explicit:
|
|
candidates.append(Path(explicit))
|
|
|
|
env_path = os.getenv("TRADINGAGENTS_FACTOR_RULES_PATH")
|
|
if env_path:
|
|
candidates.append(Path(env_path))
|
|
|
|
candidates.extend(
|
|
[
|
|
project_dir / "examples" / "factor_rules.json",
|
|
project_dir / "factor_rules.json",
|
|
]
|
|
)
|
|
|
|
safe_candidates = []
|
|
seen = set()
|
|
for candidate in candidates:
|
|
try:
|
|
resolved = candidate.resolve()
|
|
except Exception:
|
|
continue
|
|
if resolved.name not in _ALLOWED_RULE_FILENAMES:
|
|
continue
|
|
if not any(parent == resolved.parent or parent in resolved.parents for parent in allowed_dirs):
|
|
continue
|
|
if resolved in seen:
|
|
continue
|
|
seen.add(resolved)
|
|
safe_candidates.append(resolved)
|
|
return safe_candidates
|
|
|
|
|
|
def load_factor_rules(config: Optional[Dict[str, Any]] = None) -> Tuple[List[Dict[str, Any]], Optional[str]]:
|
|
for path in _candidate_rule_paths(config):
|
|
if not path.exists():
|
|
continue
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
if isinstance(data, list):
|
|
rules = data
|
|
elif isinstance(data, dict):
|
|
rules = data.get("rules", [])
|
|
else:
|
|
rules = []
|
|
|
|
if not isinstance(rules, list):
|
|
raise ValueError("Factor rules file must contain a list under 'rules' or be a list itself.")
|
|
return rules, str(path)
|
|
return [], None
|
|
|
|
|
|
def summarize_factor_rules(rules: List[Dict[str, Any]], ticker: str, trade_date: str) -> str:
|
|
if not rules:
|
|
return (
|
|
f"No factor rules were loaded for {ticker} on {trade_date}. "
|
|
"Treat this as missing custom factor context and do not fabricate rule-based signals."
|
|
)
|
|
|
|
lines = [
|
|
f"Factor rule context for {ticker} on {trade_date}.",
|
|
f"Loaded {len(rules)} manually curated factor rules.",
|
|
"Use these as explicit analyst guidance, not as guaranteed facts.",
|
|
"",
|
|
]
|
|
|
|
for idx, rule in enumerate(rules, 1):
|
|
name = rule.get("name", f"Rule {idx}")
|
|
thesis = rule.get("thesis", "")
|
|
signal = rule.get("signal", "neutral")
|
|
weight = rule.get("weight", "medium")
|
|
rationale = rule.get("rationale", "")
|
|
conditions = rule.get("conditions", [])
|
|
conditions_text = "; ".join(str(c) for c in conditions) if conditions else "No explicit conditions provided"
|
|
lines.extend(
|
|
[
|
|
f"Rule {idx}: {name}",
|
|
f"- Signal bias: {signal}",
|
|
f"- Weight: {weight}",
|
|
f"- Thesis: {thesis}",
|
|
f"- Conditions: {conditions_text}",
|
|
f"- Rationale: {rationale}",
|
|
"",
|
|
]
|
|
)
|
|
|
|
bullish = [r for r in rules if str(r.get("signal", "")).lower() in {"bullish", "buy", "positive"}]
|
|
bearish = [r for r in rules if str(r.get("signal", "")).lower() in {"bearish", "sell", "negative"}]
|
|
neutral = len(rules) - len(bullish) - len(bearish)
|
|
lines.extend(
|
|
[
|
|
"Portfolio-level summary:",
|
|
f"- Bullish leaning rules: {len(bullish)}",
|
|
f"- Bearish leaning rules: {len(bearish)}",
|
|
f"- Neutral / mixed rules: {neutral}",
|
|
"When these rules conflict with market/news/fundamental evidence, explicitly discuss the conflict instead of ignoring it.",
|
|
]
|
|
)
|
|
|
|
return "\n".join(lines)
|