fix: V-001 security vulnerability

Automated security fix generated by Orbis Security AI
This commit is contained in:
orbisai0security 2026-04-16 14:02:18 +00:00
parent fa4d01c23a
commit a79c359b05
1 changed files with 34 additions and 9 deletions

View File

@ -1,4 +1,5 @@
import os
import re
import requests
import pandas as pd
import json
@ -14,6 +15,10 @@ def get_api_key() -> str:
raise ValueError("ALPHA_VANTAGE_API_KEY environment variable is not set.")
return api_key
def _sanitize_url(url: str) -> str:
"""Remove sensitive API key values from a URL string for safe logging."""
return re.sub(r'([\?&]apikey=)[^&]+', r'\1***REDACTED***', url, flags=re.IGNORECASE)
def format_datetime_for_api(date_input) -> str:
"""Convert various date formats to YYYYMMDDTHHMM format required by Alpha Vantage API."""
if isinstance(date_input, str):
@ -41,33 +46,53 @@ class AlphaVantageRateLimitError(Exception):
def _make_api_request(function_name: str, params: dict) -> dict | str:
"""Helper function to make API requests and handle responses.
The Alpha Vantage API requires the key to be supplied as a query parameter.
To reduce the risk of accidental key exposure in logs and error messages the
key is:
- never stored in the intermediate ``api_params`` dict, and
- redacted from any URL that appears in exception text via
``_sanitize_url()``.
Raises:
AlphaVantageRateLimitError: When API rate limit is exceeded
"""
# Create a copy of params to avoid modifying the original
# Create a copy of params to avoid modifying the original.
# The API key is intentionally kept out of this dict so it does not appear
# in local state that could be logged or serialised for debugging.
api_params = params.copy()
api_params.update({
"function": function_name,
"apikey": get_api_key(),
"source": "trading_agents",
})
# Handle entitlement parameter if present in params or global variable
current_entitlement = globals().get('_current_entitlement')
entitlement = api_params.get("entitlement") or current_entitlement
if entitlement:
api_params["entitlement"] = entitlement
elif "entitlement" in api_params:
# Remove entitlement if it's None or empty
api_params.pop("entitlement", None)
response = requests.get(API_BASE_URL, params=api_params)
response.raise_for_status()
# Merge the API key into request_params only at call-time so the key is
# not retained in any longer-lived variable.
request_params = {**api_params, "apikey": get_api_key()}
try:
response = requests.get(API_BASE_URL, params=request_params)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
# Sanitize the request URL before it propagates to callers / logs.
safe_url = _sanitize_url(exc.response.url if exc.response is not None else "")
raise requests.exceptions.HTTPError(
f"HTTP error {exc.response.status_code} for URL: {safe_url}",
response=exc.response,
) from exc
response_text = response.text
# Check if response is JSON (error responses are typically JSON)
try:
response_json = json.loads(response_text)