Coverage for src / harnessutils / maintenance.py: 91%

196 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 09:07 -0600

1"""Phase 3D: Cleanup and maintenance operations. 

2 

3Provides background cleanup, drift detection, and quality issue identification. 

4""" 

5 

6from __future__ import annotations 

7 

8import time 

9from dataclasses import dataclass, field 

10from typing import Any, Literal 

11 

12from harnessutils.compaction.pruning import ( 

13 compute_content_hash, 

14) 

15from harnessutils.config import PruningConfig 

16from harnessutils.models.conversation import Conversation 

17from harnessutils.models.message import Message 

18from harnessutils.tokens.exact import count_tokens_fast 

19 

20 

21@dataclass 

22class CleanupResult: 

23 """Result of cleanup operation.""" 

24 

25 messages_archived: int = 0 

26 tokens_freed: int = 0 

27 duplicates_removed: int = 0 

28 stale_outputs_pruned: int = 0 

29 operations: list[str] = field(default_factory=list) 

30 

31 def to_dict(self) -> dict[str, Any]: 

32 """Convert to dictionary. 

33 

34 Returns: 

35 Dictionary representation 

36 """ 

37 return { 

38 "messages_archived": self.messages_archived, 

39 "tokens_freed": self.tokens_freed, 

40 "duplicates_removed": self.duplicates_removed, 

41 "stale_outputs_pruned": self.stale_outputs_pruned, 

42 "operations": self.operations, 

43 } 

44 

45 

46@dataclass 

47class ContextIssue: 

48 """Detected context quality issue.""" 

49 

50 issue_type: Literal[ 

51 "high_redundancy", 

52 "staleness_accumulation", 

53 "low_information_density", 

54 "error_loss", 

55 "excessive_protection", 

56 ] 

57 severity: Literal["info", "warning", "error"] 

58 description: str 

59 affected_count: int = 0 

60 suggested_fix: str = "" 

61 metadata: dict[str, Any] = field(default_factory=dict) 

62 

63 def to_dict(self) -> dict[str, Any]: 

64 """Convert to dictionary. 

65 

66 Returns: 

67 Dictionary representation 

68 """ 

69 return { 

70 "issue_type": self.issue_type, 

71 "severity": self.severity, 

72 "description": self.description, 

73 "affected_count": self.affected_count, 

74 "suggested_fix": self.suggested_fix, 

75 "metadata": self.metadata, 

76 } 

77 

78 

79def cleanup_stale_data( 

80 messages: list[Message], 

81 config: PruningConfig, 

82 max_age_hours: float = 24, 

83 keep_errors: bool = True, 

84 execute: bool = False, 

85) -> CleanupResult: 

86 """Clean up stale conversation data. 

87 

88 Removes or archives old messages and outputs based on age. 

89 

90 Args: 

91 messages: Conversation messages 

92 config: Pruning configuration 

93 max_age_hours: Maximum age in hours before considering stale 

94 keep_errors: Whether to preserve error messages 

95 execute: When True, actually clear stale outputs (not just count them) 

96 

97 Returns: 

98 CleanupResult with statistics 

99 """ 

100 result = CleanupResult() 

101 current_time = int(time.time() * 1000) 

102 max_age_ms = int(max_age_hours * 3600 * 1000) 

103 

104 stale_outputs = 0 

105 tokens_freed = 0 

106 

107 for msg in messages: 

108 msg_timestamp = msg.metadata.get("timestamp", current_time) 

109 age_ms = current_time - msg_timestamp 

110 

111 # Skip if not stale 

112 if age_ms < max_age_ms: 

113 continue 

114 

115 # Skip errors if keep_errors is True 

116 if keep_errors and msg.error: 

117 continue 

118 

119 # Count stale tool outputs 

120 for part in msg.parts: 

121 if part.type == "tool": 

122 if hasattr(part, "state") and hasattr(part.state, "output"): 

123 output = part.state.output 

124 if output: 

125 # Don't remove if it's an error and keep_errors is True 

126 if keep_errors and ( 

127 part.state.status == "error" 

128 or "error" in output.lower() 

129 ): 

130 continue 

131 

132 tokens = count_tokens_fast(output) 

133 if execute: 

134 part.state.output = "" 

135 if part.state.time: 

136 part.state.time.compacted = int(time.time() * 1000) 

137 result.stale_outputs_pruned += 1 

138 result.tokens_freed += tokens 

139 else: 

140 tokens_freed += tokens 

141 stale_outputs += 1 

142 

143 if not execute: 

144 result.stale_outputs_pruned = stale_outputs 

145 result.tokens_freed = tokens_freed 

146 

147 verb = "Cleared" if execute else "Identified" 

148 result.operations.append( 

149 f"{verb} {result.stale_outputs_pruned} stale outputs (>{max_age_hours}h old)" 

150 ) 

151 

152 return result 

153 

154 

155def scan_and_deduplicate( 

156 messages: list[Message], 

157 config: PruningConfig, 

158) -> CleanupResult: 

159 """Scan for and remove duplicate outputs. 

160 

161 Args: 

162 messages: Conversation messages 

163 config: Pruning configuration 

164 

165 Returns: 

166 CleanupResult with deduplication statistics 

167 """ 

168 result = CleanupResult() 

169 

170 seen_hashes: dict[str, str] = {} # hash -> message_id 

171 duplicates_found = 0 

172 tokens_saved = 0 

173 

174 for msg in messages: 

175 for part in msg.parts: 

176 if part.type != "tool": 

177 continue 

178 

179 if not hasattr(part, "state") or not hasattr(part.state, "output"): 

180 continue 

181 

182 output = part.state.output 

183 if not output: 

184 continue 

185 

186 # Compute hash 

187 output_hash = compute_content_hash(output) 

188 

189 # Check if duplicate 

190 if output_hash in seen_hashes: 

191 duplicates_found += 1 

192 tokens_saved += count_tokens_fast(output) 

193 else: 

194 seen_hashes[output_hash] = msg.id 

195 

196 result.duplicates_removed = duplicates_found 

197 result.tokens_freed = tokens_saved 

198 result.operations.append(f"Found {duplicates_found} duplicate outputs") 

199 

200 return result 

201 

202 

203def detect_context_issues( 

204 messages: list[Message], 

205 conversation: Conversation, 

206 config: PruningConfig, 

207) -> list[ContextIssue]: 

208 """Detect quality and drift issues in conversation context. 

209 

210 Analyzes conversation for: 

211 - High redundancy (duplicates) 

212 - Staleness accumulation 

213 - Low information density 

214 - Error loss 

215 - Excessive protection 

216 

217 Args: 

218 messages: Conversation messages 

219 conversation: Conversation object 

220 config: Pruning configuration 

221 

222 Returns: 

223 List of detected issues 

224 """ 

225 issues: list[ContextIssue] = [] 

226 

227 # 1. Check for high redundancy 

228 redundancy = _calculate_redundancy(messages) 

229 if redundancy > 0.25: 

230 issues.append( 

231 ContextIssue( 

232 issue_type="high_redundancy", 

233 severity="warning" if redundancy > 0.4 else "info", 

234 description=f"High redundancy detected: {int(redundancy * 100)}% duplicate content", 

235 affected_count=_count_duplicates(messages), 

236 suggested_fix="Run scan_and_deduplicate() to remove duplicates", 

237 metadata={"redundancy_ratio": redundancy}, 

238 ) 

239 ) 

240 

241 # 2. Check for staleness accumulation 

242 avg_age_turns = _calculate_average_age(messages) 

243 if avg_age_turns > 15: 

244 issues.append( 

245 ContextIssue( 

246 issue_type="staleness_accumulation", 

247 severity="info", 

248 description=f"Context aging: average message age is {avg_age_turns:.1f} turns", 

249 suggested_fix="Consider running summarization to compress old context", 

250 metadata={"avg_age_turns": avg_age_turns}, 

251 ) 

252 ) 

253 

254 # 3. Check information density 

255 density = _calculate_information_density(messages) 

256 if density < 0.5: 

257 issues.append( 

258 ContextIssue( 

259 issue_type="low_information_density", 

260 severity="warning", 

261 description=f"Low information density: {int(density * 100)}% unique content", 

262 suggested_fix="Run deduplication and compaction", 

263 metadata={"density": density}, 

264 ) 

265 ) 

266 

267 # 4. Check for error preservation 

268 error_count = _count_errors(messages) 

269 if error_count > 0: 

270 # Check if errors are being lost (this is informational) 

271 issues.append( 

272 ContextIssue( 

273 issue_type="error_loss", 

274 severity="info", 

275 description=f"{error_count} error messages in context", 

276 affected_count=error_count, 

277 suggested_fix="Ensure preserve_errors config is enabled", 

278 metadata={"error_count": error_count}, 

279 ) 

280 ) 

281 

282 # 5. Check for excessive protection 

283 protected_ratio = _calculate_protected_ratio(messages, config) 

284 if protected_ratio > 0.5: 

285 issues.append( 

286 ContextIssue( 

287 issue_type="excessive_protection", 

288 severity="warning", 

289 description=f"Excessive protection: {int(protected_ratio * 100)}% tokens protected", 

290 suggested_fix="Review protected_tools config - may limit pruning effectiveness", 

291 metadata={"protected_ratio": protected_ratio}, 

292 ) 

293 ) 

294 

295 return issues 

296 

297 

298def _calculate_redundancy(messages: list[Message]) -> float: 

299 """Calculate redundancy ratio (0.0-1.0). 

300 

301 Args: 

302 messages: Conversation messages 

303 

304 Returns: 

305 Redundancy ratio 

306 """ 

307 seen_hashes: set[str] = set() 

308 total_outputs = 0 

309 duplicate_outputs = 0 

310 

311 for msg in messages: 

312 for part in msg.parts: 

313 if part.type != "tool": 

314 continue 

315 

316 if not hasattr(part, "state") or not hasattr(part.state, "output"): 

317 continue 

318 

319 output = part.state.output 

320 if not output: 

321 continue 

322 

323 total_outputs += 1 

324 output_hash = compute_content_hash(output) 

325 

326 if output_hash in seen_hashes: 

327 duplicate_outputs += 1 

328 else: 

329 seen_hashes.add(output_hash) 

330 

331 if total_outputs == 0: 

332 return 0.0 

333 

334 return duplicate_outputs / total_outputs 

335 

336 

337def _count_duplicates(messages: list[Message]) -> int: 

338 """Count duplicate outputs. 

339 

340 Args: 

341 messages: Conversation messages 

342 

343 Returns: 

344 Number of duplicates 

345 """ 

346 seen_hashes: set[str] = set() 

347 duplicates = 0 

348 

349 for msg in messages: 

350 for part in msg.parts: 

351 if part.type != "tool": 

352 continue 

353 

354 if not hasattr(part, "state") or not hasattr(part.state, "output"): 

355 continue 

356 

357 output = part.state.output 

358 if not output: 

359 continue 

360 

361 output_hash = compute_content_hash(output) 

362 if output_hash in seen_hashes: 

363 duplicates += 1 

364 else: 

365 seen_hashes.add(output_hash) 

366 

367 return duplicates 

368 

369 

370def _calculate_average_age(messages: list[Message]) -> float: 

371 """Calculate average message age in turns. 

372 

373 Args: 

374 messages: Conversation messages 

375 

376 Returns: 

377 Average age in turns 

378 """ 

379 if not messages: 

380 return 0.0 

381 

382 total_turns = len(messages) 

383 total_age = 0.0 

384 

385 for idx, msg in enumerate(messages): 

386 age = total_turns - idx - 1 

387 total_age += age 

388 

389 return total_age / total_turns 

390 

391 

392def _calculate_information_density(messages: list[Message]) -> float: 

393 """Calculate information density (unique content ratio). 

394 

395 Args: 

396 messages: Conversation messages 

397 

398 Returns: 

399 Density ratio (0.0-1.0) 

400 """ 

401 # Simple approximation: 1 - redundancy 

402 redundancy = _calculate_redundancy(messages) 

403 return 1.0 - redundancy 

404 

405 

406def _count_errors(messages: list[Message]) -> int: 

407 """Count error messages. 

408 

409 Args: 

410 messages: Conversation messages 

411 

412 Returns: 

413 Number of error messages 

414 """ 

415 error_count = 0 

416 

417 for msg in messages: 

418 if msg.error: 

419 error_count += 1 

420 continue 

421 

422 # Check tool outputs for errors 

423 for part in msg.parts: 

424 if part.type != "tool": 

425 continue 

426 

427 if hasattr(part, "state"): 

428 if part.state.status == "error": 

429 error_count += 1 

430 break 

431 

432 output = getattr(part.state, "output", "") 

433 if output and "error" in output.lower(): 

434 error_count += 1 

435 break 

436 

437 return error_count 

438 

439 

440def _calculate_protected_ratio( 

441 messages: list[Message], config: PruningConfig 

442) -> float: 

443 """Calculate ratio of protected tokens. 

444 

445 Args: 

446 messages: Conversation messages 

447 config: Pruning configuration 

448 

449 Returns: 

450 Protected ratio (0.0-1.0) 

451 """ 

452 total_tokens = 0 

453 protected_tokens = 0 

454 turns_skipped = 0 

455 

456 for msg in reversed(messages): 

457 if msg.role == "user": 

458 turns_skipped += 1 

459 

460 for part in msg.parts: 

461 if part.type != "tool": 

462 continue 

463 

464 if not hasattr(part, "state") or not hasattr(part.state, "output"): 

465 continue 

466 

467 output = part.state.output 

468 if not output: 

469 continue 

470 

471 tokens = count_tokens_fast(output) 

472 total_tokens += tokens 

473 

474 # Check if protected 

475 tool = getattr(part, "tool", "") 

476 is_protected = ( 

477 turns_skipped < config.protect_turns or tool in config.protected_tools 

478 ) 

479 

480 if is_protected: 

481 protected_tokens += tokens 

482 

483 if total_tokens == 0: 

484 return 0.0 

485 

486 return protected_tokens / total_tokens