Coverage for src / harnessutils / query.py: 87%
125 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 08:30 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 08:30 -0600
1"""Query and filter API for selective message loading.
3Provides flexible message querying without loading full conversation history.
4"""
6from __future__ import annotations
8from dataclasses import dataclass, field
9from typing import Any, Literal
11from harnessutils.models.message import Message
14@dataclass
15class MessageFilter:
16 """Criteria for filtering messages."""
18 has_errors: bool | None = None # Only messages with errors
19 has_warnings: bool | None = None # Only messages with warnings
20 min_importance: float | None = None # Minimum importance score
21 tools: list[str] | None = None # Specific tool types
22 roles: list[str] | None = None # Message roles (user/assistant)
23 has_tool_outputs: bool | None = None # Only messages with tool outputs
24 is_summary: bool | None = None # Only summary messages
26 def matches(self, message: Message) -> bool:
27 """Check if message matches all filter criteria.
29 Args:
30 message: Message to check
32 Returns:
33 True if message matches all non-None criteria
34 """
35 # Role filter
36 if self.roles is not None and message.role not in self.roles:
37 return False
39 # Summary filter
40 if self.is_summary is not None and message.summary != self.is_summary:
41 return False
43 # Tool outputs filter
44 if self.has_tool_outputs is not None:
45 has_outputs = any(
46 part.type == "tool" and getattr(part, "state", None) is not None
47 for part in message.parts
48 )
49 if has_outputs != self.has_tool_outputs:
50 return False
52 # Error filter
53 if self.has_errors is not None:
54 has_errors = self._has_errors(message)
55 if has_errors != self.has_errors:
56 return False
58 # Warning filter
59 if self.has_warnings is not None:
60 has_warnings = self._has_warnings(message)
61 if has_warnings != self.has_warnings:
62 return False
64 # Tool type filter
65 if self.tools is not None:
66 message_tools = self._get_message_tools(message)
67 if not any(tool in self.tools for tool in message_tools):
68 return False
70 # Importance filter (requires metadata)
71 if self.min_importance is not None:
72 importance = message.metadata.get("importance_score", 0.0)
73 if importance < self.min_importance:
74 return False
76 return True
78 def _has_errors(self, message: Message) -> bool:
79 """Check if message has errors.
81 Args:
82 message: Message to check
84 Returns:
85 True if message has error status or error content
86 """
87 # Check message-level error
88 if message.error:
89 return True
91 # Check tool parts for errors
92 for part in message.parts:
93 if part.type == "tool":
94 tool_part = part
95 if hasattr(tool_part, "state"):
96 if getattr(tool_part.state, "status", "") == "error":
97 return True
98 output = getattr(tool_part.state, "output", "")
99 if output and any(
100 keyword in output.lower()
101 for keyword in ["error", "exception", "traceback"]
102 ):
103 return True
105 return False
107 def _has_warnings(self, message: Message) -> bool:
108 """Check if message has warnings.
110 Args:
111 message: Message to check
113 Returns:
114 True if message contains warning content
115 """
116 for part in message.parts:
117 if part.type == "tool":
118 tool_part = part
119 if hasattr(tool_part, "state"):
120 output = getattr(tool_part.state, "output", "")
121 if output and "warning" in output.lower():
122 return True
124 return False
126 def _get_message_tools(self, message: Message) -> list[str]:
127 """Get list of tools used in message.
129 Args:
130 message: Message to check
132 Returns:
133 List of tool names
134 """
135 tools = []
136 for part in message.parts:
137 if part.type == "tool" and hasattr(part, "tool"):
138 tools.append(part.tool)
140 return tools
143@dataclass
144class QueryOptions:
145 """Options for message query."""
147 limit: int | None = None # Maximum messages to return
148 offset: int = 0 # Skip first N messages
149 order: Literal["asc", "desc"] = "asc" # Order by creation time
150 after: int | None = None # Unix ms timestamp - messages after this time
151 before: int | None = None # Unix ms timestamp - messages before this time
152 filter: MessageFilter | None = None # Filter criteria
155def query_messages(
156 messages: list[Message],
157 options: QueryOptions,
158) -> list[Message]:
159 """Query messages with filtering and pagination.
161 Args:
162 messages: All messages to query from
163 options: Query options
165 Returns:
166 Filtered and paginated messages
167 """
168 results = messages.copy()
170 # Time range filtering
171 if options.after is not None:
172 results = [
173 msg
174 for msg in results
175 if msg.metadata.get("timestamp", 0) > options.after
176 ]
178 if options.before is not None:
179 results = [
180 msg
181 for msg in results
182 if msg.metadata.get("timestamp", float("inf")) < options.before
183 ]
185 # Apply filter criteria
186 if options.filter is not None:
187 results = [msg for msg in results if options.filter.matches(msg)]
189 # Sort by order
190 if options.order == "desc":
191 results = list(reversed(results))
193 # Apply offset
194 if options.offset > 0:
195 results = results[options.offset :]
197 # Apply limit
198 if options.limit is not None:
199 results = results[: options.limit]
201 return results
204@dataclass
205class ContextSummary:
206 """Lightweight summary of conversation context."""
208 conversation_id: str
209 message_count: int
210 total_tokens: int
211 summaries: list[dict[str, Any]] = field(default_factory=list)
212 recent_activity: list[dict[str, Any]] = field(default_factory=list)
213 key_messages: list[dict[str, Any]] = field(default_factory=list)
214 errors: list[dict[str, Any]] = field(default_factory=list)
216 def to_dict(self) -> dict[str, Any]:
217 """Convert to dictionary.
219 Returns:
220 Dictionary representation
221 """
222 return {
223 "conversation_id": self.conversation_id,
224 "message_count": self.message_count,
225 "total_tokens": self.total_tokens,
226 "summaries": self.summaries,
227 "recent_activity": self.recent_activity,
228 "key_messages": self.key_messages,
229 "errors": self.errors,
230 }
233def build_context_summary(
234 conversation_id: str,
235 messages: list[Message],
236 recent_limit: int = 5,
237) -> ContextSummary:
238 """Build lightweight context summary without full message content.
240 Args:
241 conversation_id: Conversation ID
242 messages: All messages
243 recent_limit: Number of recent messages to include
245 Returns:
246 Context summary
247 """
248 total_tokens = sum(
249 msg.tokens.total if msg.tokens else 0 for msg in messages
250 )
252 # Extract summaries
253 summaries = []
254 for msg in messages:
255 if msg.summary:
256 # Get summary text from first text part
257 summary_text = ""
258 for part in msg.parts:
259 if part.type == "text":
260 summary_text = getattr(part, "text", "")[:200] # First 200 chars
261 break
263 summaries.append(
264 {
265 "id": msg.id,
266 "timestamp": msg.metadata.get("timestamp", 0),
267 "summary": summary_text,
268 }
269 )
271 # Recent activity (last N messages)
272 recent_activity = []
273 for msg in messages[-recent_limit:]:
274 activity = {
275 "id": msg.id,
276 "role": msg.role,
277 "timestamp": msg.metadata.get("timestamp", 0),
278 "has_tool_outputs": any(part.type == "tool" for part in msg.parts),
279 "has_errors": msg.error is not None,
280 }
281 recent_activity.append(activity)
283 # Key messages (high importance or errors)
284 key_messages = []
285 errors = []
287 for msg in messages:
288 # Check for errors
289 if msg.error:
290 errors.append(
291 {
292 "id": msg.id,
293 "timestamp": msg.metadata.get("timestamp", 0),
294 "error": str(msg.error)[:200], # First 200 chars
295 }
296 )
298 # Check importance
299 importance = msg.metadata.get("importance_score", 0.0)
300 if importance > 100: # High importance threshold
301 key_messages.append(
302 {
303 "id": msg.id,
304 "timestamp": msg.metadata.get("timestamp", 0),
305 "importance": importance,
306 }
307 )
309 return ContextSummary(
310 conversation_id=conversation_id,
311 message_count=len(messages),
312 total_tokens=total_tokens,
313 summaries=summaries,
314 recent_activity=recent_activity,
315 key_messages=key_messages,
316 errors=errors,
317 )