Coverage for src / harnessutils / query.py: 87%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 08:30 -0600

1"""Query and filter API for selective message loading. 

2 

3Provides flexible message querying without loading full conversation history. 

4""" 

5 

6from __future__ import annotations 

7 

8from dataclasses import dataclass, field 

9from typing import Any, Literal 

10 

11from harnessutils.models.message import Message 

12 

13 

14@dataclass 

15class MessageFilter: 

16 """Criteria for filtering messages.""" 

17 

18 has_errors: bool | None = None # Only messages with errors 

19 has_warnings: bool | None = None # Only messages with warnings 

20 min_importance: float | None = None # Minimum importance score 

21 tools: list[str] | None = None # Specific tool types 

22 roles: list[str] | None = None # Message roles (user/assistant) 

23 has_tool_outputs: bool | None = None # Only messages with tool outputs 

24 is_summary: bool | None = None # Only summary messages 

25 

26 def matches(self, message: Message) -> bool: 

27 """Check if message matches all filter criteria. 

28 

29 Args: 

30 message: Message to check 

31 

32 Returns: 

33 True if message matches all non-None criteria 

34 """ 

35 # Role filter 

36 if self.roles is not None and message.role not in self.roles: 

37 return False 

38 

39 # Summary filter 

40 if self.is_summary is not None and message.summary != self.is_summary: 

41 return False 

42 

43 # Tool outputs filter 

44 if self.has_tool_outputs is not None: 

45 has_outputs = any( 

46 part.type == "tool" and getattr(part, "state", None) is not None 

47 for part in message.parts 

48 ) 

49 if has_outputs != self.has_tool_outputs: 

50 return False 

51 

52 # Error filter 

53 if self.has_errors is not None: 

54 has_errors = self._has_errors(message) 

55 if has_errors != self.has_errors: 

56 return False 

57 

58 # Warning filter 

59 if self.has_warnings is not None: 

60 has_warnings = self._has_warnings(message) 

61 if has_warnings != self.has_warnings: 

62 return False 

63 

64 # Tool type filter 

65 if self.tools is not None: 

66 message_tools = self._get_message_tools(message) 

67 if not any(tool in self.tools for tool in message_tools): 

68 return False 

69 

70 # Importance filter (requires metadata) 

71 if self.min_importance is not None: 

72 importance = message.metadata.get("importance_score", 0.0) 

73 if importance < self.min_importance: 

74 return False 

75 

76 return True 

77 

78 def _has_errors(self, message: Message) -> bool: 

79 """Check if message has errors. 

80 

81 Args: 

82 message: Message to check 

83 

84 Returns: 

85 True if message has error status or error content 

86 """ 

87 # Check message-level error 

88 if message.error: 

89 return True 

90 

91 # Check tool parts for errors 

92 for part in message.parts: 

93 if part.type == "tool": 

94 tool_part = part 

95 if hasattr(tool_part, "state"): 

96 if getattr(tool_part.state, "status", "") == "error": 

97 return True 

98 output = getattr(tool_part.state, "output", "") 

99 if output and any( 

100 keyword in output.lower() 

101 for keyword in ["error", "exception", "traceback"] 

102 ): 

103 return True 

104 

105 return False 

106 

107 def _has_warnings(self, message: Message) -> bool: 

108 """Check if message has warnings. 

109 

110 Args: 

111 message: Message to check 

112 

113 Returns: 

114 True if message contains warning content 

115 """ 

116 for part in message.parts: 

117 if part.type == "tool": 

118 tool_part = part 

119 if hasattr(tool_part, "state"): 

120 output = getattr(tool_part.state, "output", "") 

121 if output and "warning" in output.lower(): 

122 return True 

123 

124 return False 

125 

126 def _get_message_tools(self, message: Message) -> list[str]: 

127 """Get list of tools used in message. 

128 

129 Args: 

130 message: Message to check 

131 

132 Returns: 

133 List of tool names 

134 """ 

135 tools = [] 

136 for part in message.parts: 

137 if part.type == "tool" and hasattr(part, "tool"): 

138 tools.append(part.tool) 

139 

140 return tools 

141 

142 

143@dataclass 

144class QueryOptions: 

145 """Options for message query.""" 

146 

147 limit: int | None = None # Maximum messages to return 

148 offset: int = 0 # Skip first N messages 

149 order: Literal["asc", "desc"] = "asc" # Order by creation time 

150 after: int | None = None # Unix ms timestamp - messages after this time 

151 before: int | None = None # Unix ms timestamp - messages before this time 

152 filter: MessageFilter | None = None # Filter criteria 

153 

154 

155def query_messages( 

156 messages: list[Message], 

157 options: QueryOptions, 

158) -> list[Message]: 

159 """Query messages with filtering and pagination. 

160 

161 Args: 

162 messages: All messages to query from 

163 options: Query options 

164 

165 Returns: 

166 Filtered and paginated messages 

167 """ 

168 results = messages.copy() 

169 

170 # Time range filtering 

171 if options.after is not None: 

172 results = [ 

173 msg 

174 for msg in results 

175 if msg.metadata.get("timestamp", 0) > options.after 

176 ] 

177 

178 if options.before is not None: 

179 results = [ 

180 msg 

181 for msg in results 

182 if msg.metadata.get("timestamp", float("inf")) < options.before 

183 ] 

184 

185 # Apply filter criteria 

186 if options.filter is not None: 

187 results = [msg for msg in results if options.filter.matches(msg)] 

188 

189 # Sort by order 

190 if options.order == "desc": 

191 results = list(reversed(results)) 

192 

193 # Apply offset 

194 if options.offset > 0: 

195 results = results[options.offset :] 

196 

197 # Apply limit 

198 if options.limit is not None: 

199 results = results[: options.limit] 

200 

201 return results 

202 

203 

204@dataclass 

205class ContextSummary: 

206 """Lightweight summary of conversation context.""" 

207 

208 conversation_id: str 

209 message_count: int 

210 total_tokens: int 

211 summaries: list[dict[str, Any]] = field(default_factory=list) 

212 recent_activity: list[dict[str, Any]] = field(default_factory=list) 

213 key_messages: list[dict[str, Any]] = field(default_factory=list) 

214 errors: list[dict[str, Any]] = field(default_factory=list) 

215 

216 def to_dict(self) -> dict[str, Any]: 

217 """Convert to dictionary. 

218 

219 Returns: 

220 Dictionary representation 

221 """ 

222 return { 

223 "conversation_id": self.conversation_id, 

224 "message_count": self.message_count, 

225 "total_tokens": self.total_tokens, 

226 "summaries": self.summaries, 

227 "recent_activity": self.recent_activity, 

228 "key_messages": self.key_messages, 

229 "errors": self.errors, 

230 } 

231 

232 

233def build_context_summary( 

234 conversation_id: str, 

235 messages: list[Message], 

236 recent_limit: int = 5, 

237) -> ContextSummary: 

238 """Build lightweight context summary without full message content. 

239 

240 Args: 

241 conversation_id: Conversation ID 

242 messages: All messages 

243 recent_limit: Number of recent messages to include 

244 

245 Returns: 

246 Context summary 

247 """ 

248 total_tokens = sum( 

249 msg.tokens.total if msg.tokens else 0 for msg in messages 

250 ) 

251 

252 # Extract summaries 

253 summaries = [] 

254 for msg in messages: 

255 if msg.summary: 

256 # Get summary text from first text part 

257 summary_text = "" 

258 for part in msg.parts: 

259 if part.type == "text": 

260 summary_text = getattr(part, "text", "")[:200] # First 200 chars 

261 break 

262 

263 summaries.append( 

264 { 

265 "id": msg.id, 

266 "timestamp": msg.metadata.get("timestamp", 0), 

267 "summary": summary_text, 

268 } 

269 ) 

270 

271 # Recent activity (last N messages) 

272 recent_activity = [] 

273 for msg in messages[-recent_limit:]: 

274 activity = { 

275 "id": msg.id, 

276 "role": msg.role, 

277 "timestamp": msg.metadata.get("timestamp", 0), 

278 "has_tool_outputs": any(part.type == "tool" for part in msg.parts), 

279 "has_errors": msg.error is not None, 

280 } 

281 recent_activity.append(activity) 

282 

283 # Key messages (high importance or errors) 

284 key_messages = [] 

285 errors = [] 

286 

287 for msg in messages: 

288 # Check for errors 

289 if msg.error: 

290 errors.append( 

291 { 

292 "id": msg.id, 

293 "timestamp": msg.metadata.get("timestamp", 0), 

294 "error": str(msg.error)[:200], # First 200 chars 

295 } 

296 ) 

297 

298 # Check importance 

299 importance = msg.metadata.get("importance_score", 0.0) 

300 if importance > 100: # High importance threshold 

301 key_messages.append( 

302 { 

303 "id": msg.id, 

304 "timestamp": msg.metadata.get("timestamp", 0), 

305 "importance": importance, 

306 } 

307 ) 

308 

309 return ContextSummary( 

310 conversation_id=conversation_id, 

311 message_count=len(messages), 

312 total_tokens=total_tokens, 

313 summaries=summaries, 

314 recent_activity=recent_activity, 

315 key_messages=key_messages, 

316 errors=errors, 

317 )