fix(cli): dedupe idless streamed messages
This commit is contained in:
parent
9ba1858948
commit
28f2384990
|
|
@ -1,13 +1,41 @@
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
def _tool_call_signature(tool_call):
|
||||||
|
if isinstance(tool_call, dict):
|
||||||
|
name = tool_call["name"]
|
||||||
|
args = tool_call["args"]
|
||||||
|
else:
|
||||||
|
name = tool_call.name
|
||||||
|
args = tool_call.args
|
||||||
|
return (name, json.dumps(args, sort_keys=True, default=str))
|
||||||
|
|
||||||
|
|
||||||
|
def _message_fingerprint(message, msg_type, content):
|
||||||
|
tool_calls = tuple(_tool_call_signature(tool_call) for tool_call in getattr(message, "tool_calls", []) or [])
|
||||||
|
return (
|
||||||
|
message.__class__.__name__,
|
||||||
|
msg_type,
|
||||||
|
content.strip() if isinstance(content, str) else str(content),
|
||||||
|
tool_calls,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def ingest_chunk_messages(message_buffer, chunk, classify_message_type) -> None:
|
def ingest_chunk_messages(message_buffer, chunk, classify_message_type) -> None:
|
||||||
"""Ingest all newly seen messages from a graph stream chunk."""
|
"""Ingest all newly seen messages from a graph stream chunk."""
|
||||||
for message in chunk.get("messages", []):
|
for message in chunk.get("messages", []):
|
||||||
|
msg_type, content = classify_message_type(message)
|
||||||
msg_id = getattr(message, "id", None)
|
msg_id = getattr(message, "id", None)
|
||||||
if msg_id is not None:
|
if msg_id is not None:
|
||||||
if msg_id in message_buffer._processed_message_ids:
|
if msg_id in message_buffer._processed_message_ids:
|
||||||
continue
|
continue
|
||||||
message_buffer._processed_message_ids.add(msg_id)
|
message_buffer._processed_message_ids.add(msg_id)
|
||||||
|
else:
|
||||||
|
fingerprint = _message_fingerprint(message, msg_type, content)
|
||||||
|
if fingerprint in message_buffer._processed_message_fingerprints:
|
||||||
|
continue
|
||||||
|
message_buffer._processed_message_fingerprints.add(fingerprint)
|
||||||
|
|
||||||
msg_type, content = classify_message_type(message)
|
|
||||||
if content and content.strip():
|
if content and content.strip():
|
||||||
message_buffer.add_message(msg_type, content)
|
message_buffer.add_message(msg_type, content)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -82,6 +82,7 @@ class MessageBuffer:
|
||||||
self.report_sections = {}
|
self.report_sections = {}
|
||||||
self.selected_analysts = []
|
self.selected_analysts = []
|
||||||
self._processed_message_ids = set()
|
self._processed_message_ids = set()
|
||||||
|
self._processed_message_fingerprints = set()
|
||||||
|
|
||||||
def init_for_analysis(self, selected_analysts):
|
def init_for_analysis(self, selected_analysts):
|
||||||
"""Initialize agent status and report sections based on selected analysts.
|
"""Initialize agent status and report sections based on selected analysts.
|
||||||
|
|
@ -117,6 +118,7 @@ class MessageBuffer:
|
||||||
self.messages.clear()
|
self.messages.clear()
|
||||||
self.tool_calls.clear()
|
self.tool_calls.clear()
|
||||||
self._processed_message_ids.clear()
|
self._processed_message_ids.clear()
|
||||||
|
self._processed_message_fingerprints.clear()
|
||||||
|
|
||||||
def get_completed_reports_count(self):
|
def get_completed_reports_count(self):
|
||||||
"""Count reports that are finalized (their finalizing agent is completed).
|
"""Count reports that are finalized (their finalizing agent is completed).
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ class FakeMessage:
|
||||||
class FakeMessageBuffer:
|
class FakeMessageBuffer:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._processed_message_ids = set()
|
self._processed_message_ids = set()
|
||||||
|
self._processed_message_fingerprints = set()
|
||||||
self.messages = []
|
self.messages = []
|
||||||
self.tool_calls = []
|
self.tool_calls = []
|
||||||
|
|
||||||
|
|
@ -62,3 +63,29 @@ def test_ingest_chunk_messages_skips_duplicate_message_ids():
|
||||||
|
|
||||||
assert len(message_buffer.messages) == 1
|
assert len(message_buffer.messages) == 1
|
||||||
assert len(message_buffer.tool_calls) == 1
|
assert len(message_buffer.tool_calls) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_ingest_chunk_messages_skips_duplicate_messages_without_ids():
|
||||||
|
message_buffer = FakeMessageBuffer()
|
||||||
|
chunk = {"messages": [FakeMessage(None, "same", [{"name": "tool_a", "args": {"x": 1}}])]}
|
||||||
|
|
||||||
|
ingest_chunk_messages(message_buffer, chunk, fake_classifier)
|
||||||
|
ingest_chunk_messages(message_buffer, chunk, fake_classifier)
|
||||||
|
|
||||||
|
assert len(message_buffer.messages) == 1
|
||||||
|
assert len(message_buffer.tool_calls) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_ingest_chunk_messages_keeps_distinct_messages_without_ids():
|
||||||
|
message_buffer = FakeMessageBuffer()
|
||||||
|
chunk = {
|
||||||
|
"messages": [
|
||||||
|
FakeMessage(None, "first", [{"name": "tool_a", "args": {"x": 1}}]),
|
||||||
|
FakeMessage(None, "second", [{"name": "tool_b", "args": {"y": 2}}]),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
ingest_chunk_messages(message_buffer, chunk, fake_classifier)
|
||||||
|
|
||||||
|
assert [content for _, content in message_buffer.messages] == ["first", "second"]
|
||||||
|
assert [name for name, _ in message_buffer.tool_calls] == ["tool_a", "tool_b"]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue