Merge pull request #77 from aguzererler/security-fix-nlm-subprocess-injection-14014624082945194366
This commit is contained in:
commit
26bbe853c1
|
|
@ -61,18 +61,26 @@ def test_sync_performs_delete_then_add(mock_nlm_path):
|
|||
# Check list call
|
||||
args, kwargs = mock_run.call_args_list[0]
|
||||
assert "list" in args[0]
|
||||
assert "--json" in args[0]
|
||||
assert "--" in args[0]
|
||||
assert notebook_id in args[0]
|
||||
|
||||
# Check delete call
|
||||
args, kwargs = mock_run.call_args_list[1]
|
||||
assert "delete" in args[0]
|
||||
assert "-y" in args[0]
|
||||
assert "--" in args[0]
|
||||
assert notebook_id in args[0]
|
||||
assert source_id in args[0]
|
||||
|
||||
# Check add call
|
||||
args, kwargs = mock_run.call_args_list[2]
|
||||
assert "add" in args[0]
|
||||
assert "--text" in args[0]
|
||||
assert content in args[0]
|
||||
assert "--file" in args[0]
|
||||
assert str(digest_path) in args[0]
|
||||
assert "--wait" in args[0]
|
||||
assert "--" in args[0]
|
||||
assert notebook_id in args[0]
|
||||
|
||||
def test_sync_adds_directly_when_none_exists(mock_nlm_path):
|
||||
"""Should add new source directly if no existing one is found."""
|
||||
|
|
|
|||
|
|
@ -0,0 +1,107 @@
|
|||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from tradingagents.notebook_sync import sync_to_notebooklm
|
||||
|
||||
@pytest.fixture
|
||||
def mock_nlm_path(tmp_path):
|
||||
nlm = tmp_path / "nlm"
|
||||
nlm.touch(mode=0o755)
|
||||
return str(nlm)
|
||||
|
||||
def test_security_argument_injection(mock_nlm_path, tmp_path):
|
||||
"""
|
||||
Test that positional arguments starting with a hyphen are handled safely
|
||||
and that content is passed via file to avoid ARG_MAX issues and injection.
|
||||
"""
|
||||
# Malicious notebook_id that looks like a flag
|
||||
notebook_id = "--some-flag"
|
||||
digest_path = tmp_path / "malicious.md"
|
||||
digest_path.write_text("Some content")
|
||||
date = "2026-03-19"
|
||||
|
||||
with patch.dict(os.environ, {"NOTEBOOKLM_ID": notebook_id}):
|
||||
with patch("shutil.which", return_value=mock_nlm_path):
|
||||
with patch("subprocess.run") as mock_run:
|
||||
# Mock 'source list'
|
||||
list_result = MagicMock()
|
||||
list_result.returncode = 0
|
||||
list_result.stdout = "[]"
|
||||
|
||||
# Mock 'source add'
|
||||
add_result = MagicMock()
|
||||
add_result.returncode = 0
|
||||
|
||||
mock_run.side_effect = [list_result, add_result]
|
||||
|
||||
sync_to_notebooklm(digest_path, date)
|
||||
|
||||
# 1. Check 'source list' call
|
||||
# Expected: [nlm, "source", "list", "--json", "--", notebook_id]
|
||||
list_args = mock_run.call_args_list[0][0][0]
|
||||
assert list_args[0] == mock_nlm_path
|
||||
assert list_args[1:3] == ["source", "list"]
|
||||
assert "--json" in list_args
|
||||
assert "--" in list_args
|
||||
# "--" should be before the notebook_id
|
||||
dash_idx = list_args.index("--")
|
||||
id_idx = list_args.index(notebook_id)
|
||||
assert dash_idx < id_idx
|
||||
|
||||
# 2. Check 'source add' call
|
||||
# Expected: [nlm, "source", "add", "--title", title, "--file", str(digest_path), "--wait", "--", notebook_id]
|
||||
add_args = mock_run.call_args_list[1][0][0]
|
||||
assert add_args[0] == mock_nlm_path
|
||||
assert add_args[1:3] == ["source", "add"]
|
||||
assert "--title" in add_args
|
||||
assert "--file" in add_args
|
||||
assert str(digest_path) in add_args
|
||||
assert "--text" not in add_args # Vulnerable --text should be gone
|
||||
assert "--wait" in add_args
|
||||
assert "--" in add_args
|
||||
|
||||
dash_idx = add_args.index("--")
|
||||
id_idx = add_args.index(notebook_id)
|
||||
assert dash_idx < id_idx
|
||||
|
||||
def test_security_delete_injection(mock_nlm_path):
|
||||
"""Test that source_id in delete is also handled safely with --."""
|
||||
notebook_id = "normal-id"
|
||||
source_id = "--delete-everything"
|
||||
|
||||
with patch.dict(os.environ, {"NOTEBOOKLM_ID": notebook_id}):
|
||||
with patch("shutil.which", return_value=mock_nlm_path):
|
||||
with patch("subprocess.run") as mock_run:
|
||||
# Mock 'source list' finding the malicious source_id
|
||||
list_result = MagicMock()
|
||||
list_result.returncode = 0
|
||||
list_result.stdout = json.dumps([{"id": source_id, "title": "Daily Trading Digest (2026-03-19)"}])
|
||||
|
||||
# Mock 'source delete'
|
||||
delete_result = MagicMock()
|
||||
delete_result.returncode = 0
|
||||
|
||||
# Mock 'source add'
|
||||
add_result = MagicMock()
|
||||
add_result.returncode = 0
|
||||
|
||||
mock_run.side_effect = [list_result, delete_result, add_result]
|
||||
|
||||
sync_to_notebooklm(Path("test.md"), "2026-03-19")
|
||||
|
||||
# Check 'source delete' call
|
||||
# Expected: [nlm, "source", "delete", "-y", "--", notebook_id, source_id]
|
||||
delete_args = mock_run.call_args_list[1][0][0]
|
||||
assert delete_args[1:3] == ["source", "delete"]
|
||||
assert "-y" in delete_args
|
||||
assert "--" in delete_args
|
||||
|
||||
dash_idx = delete_args.index("--")
|
||||
id_idx = delete_args.index(notebook_id)
|
||||
sid_idx = delete_args.index(source_id)
|
||||
assert dash_idx < id_idx
|
||||
assert dash_idx < sid_idx
|
||||
|
|
@ -51,7 +51,6 @@ def sync_to_notebooklm(digest_path: Path, date: str, notebook_id: str | None = N
|
|||
console.print("[yellow]Warning: nlm CLI not found — skipping NotebookLM sync[/yellow]")
|
||||
return
|
||||
|
||||
content = digest_path.read_text()
|
||||
title = f"Daily Trading Digest ({date})"
|
||||
|
||||
# Find and delete existing source with the same title
|
||||
|
|
@ -60,14 +59,15 @@ def sync_to_notebooklm(digest_path: Path, date: str, notebook_id: str | None = N
|
|||
_delete_source(nlm, notebook_id, existing_source_id)
|
||||
|
||||
# Add as a new source
|
||||
_add_source(nlm, notebook_id, content, title)
|
||||
_add_source(nlm, notebook_id, digest_path, title)
|
||||
|
||||
|
||||
def _find_source(nlm: str, notebook_id: str, title: str) -> str | None:
|
||||
"""Return the source ID for the daily digest, or None if not found."""
|
||||
try:
|
||||
# Use -- to separate options from positional arguments
|
||||
result = subprocess.run(
|
||||
[nlm, "source", "list", notebook_id, "--json"],
|
||||
[nlm, "source", "list", "--json", "--", notebook_id],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
|
@ -85,8 +85,9 @@ def _find_source(nlm: str, notebook_id: str, title: str) -> str | None:
|
|||
def _delete_source(nlm: str, notebook_id: str, source_id: str) -> None:
|
||||
"""Delete an existing source."""
|
||||
try:
|
||||
# Use -- to separate options from positional arguments
|
||||
subprocess.run(
|
||||
[nlm, "source", "delete", notebook_id, source_id, "-y"],
|
||||
[nlm, "source", "delete", "-y", "--", notebook_id, source_id],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False, # Ignore non-zero exit since nlm sometimes fails even on success
|
||||
|
|
@ -95,11 +96,13 @@ def _delete_source(nlm: str, notebook_id: str, source_id: str) -> None:
|
|||
pass
|
||||
|
||||
|
||||
def _add_source(nlm: str, notebook_id: str, content: str, title: str) -> None:
|
||||
def _add_source(nlm: str, notebook_id: str, digest_path: Path, title: str) -> None:
|
||||
"""Add content as a new source."""
|
||||
try:
|
||||
# Use --file instead of --text to avoid ARG_MAX issues and shell injection.
|
||||
# Use -- to separate options from positional arguments.
|
||||
result = subprocess.run(
|
||||
[nlm, "source", "add", notebook_id, "--title", title, "--text", content, "--wait"],
|
||||
[nlm, "source", "add", "--title", title, "--file", str(digest_path), "--wait", "--", notebook_id],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue