Coverage for src / harnessutils / maintenance.py: 91%
196 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 09:07 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 09:07 -0600
1"""Phase 3D: Cleanup and maintenance operations.
3Provides background cleanup, drift detection, and quality issue identification.
4"""
6from __future__ import annotations
8import time
9from dataclasses import dataclass, field
10from typing import Any, Literal
12from harnessutils.compaction.pruning import (
13 compute_content_hash,
14)
15from harnessutils.config import PruningConfig
16from harnessutils.models.conversation import Conversation
17from harnessutils.models.message import Message
18from harnessutils.tokens.exact import count_tokens_fast
21@dataclass
22class CleanupResult:
23 """Result of cleanup operation."""
25 messages_archived: int = 0
26 tokens_freed: int = 0
27 duplicates_removed: int = 0
28 stale_outputs_pruned: int = 0
29 operations: list[str] = field(default_factory=list)
31 def to_dict(self) -> dict[str, Any]:
32 """Convert to dictionary.
34 Returns:
35 Dictionary representation
36 """
37 return {
38 "messages_archived": self.messages_archived,
39 "tokens_freed": self.tokens_freed,
40 "duplicates_removed": self.duplicates_removed,
41 "stale_outputs_pruned": self.stale_outputs_pruned,
42 "operations": self.operations,
43 }
46@dataclass
47class ContextIssue:
48 """Detected context quality issue."""
50 issue_type: Literal[
51 "high_redundancy",
52 "staleness_accumulation",
53 "low_information_density",
54 "error_loss",
55 "excessive_protection",
56 ]
57 severity: Literal["info", "warning", "error"]
58 description: str
59 affected_count: int = 0
60 suggested_fix: str = ""
61 metadata: dict[str, Any] = field(default_factory=dict)
63 def to_dict(self) -> dict[str, Any]:
64 """Convert to dictionary.
66 Returns:
67 Dictionary representation
68 """
69 return {
70 "issue_type": self.issue_type,
71 "severity": self.severity,
72 "description": self.description,
73 "affected_count": self.affected_count,
74 "suggested_fix": self.suggested_fix,
75 "metadata": self.metadata,
76 }
79def cleanup_stale_data(
80 messages: list[Message],
81 config: PruningConfig,
82 max_age_hours: float = 24,
83 keep_errors: bool = True,
84 execute: bool = False,
85) -> CleanupResult:
86 """Clean up stale conversation data.
88 Removes or archives old messages and outputs based on age.
90 Args:
91 messages: Conversation messages
92 config: Pruning configuration
93 max_age_hours: Maximum age in hours before considering stale
94 keep_errors: Whether to preserve error messages
95 execute: When True, actually clear stale outputs (not just count them)
97 Returns:
98 CleanupResult with statistics
99 """
100 result = CleanupResult()
101 current_time = int(time.time() * 1000)
102 max_age_ms = int(max_age_hours * 3600 * 1000)
104 stale_outputs = 0
105 tokens_freed = 0
107 for msg in messages:
108 msg_timestamp = msg.metadata.get("timestamp", current_time)
109 age_ms = current_time - msg_timestamp
111 # Skip if not stale
112 if age_ms < max_age_ms:
113 continue
115 # Skip errors if keep_errors is True
116 if keep_errors and msg.error:
117 continue
119 # Count stale tool outputs
120 for part in msg.parts:
121 if part.type == "tool":
122 if hasattr(part, "state") and hasattr(part.state, "output"):
123 output = part.state.output
124 if output:
125 # Don't remove if it's an error and keep_errors is True
126 if keep_errors and (
127 part.state.status == "error"
128 or "error" in output.lower()
129 ):
130 continue
132 tokens = count_tokens_fast(output)
133 if execute:
134 part.state.output = ""
135 if part.state.time:
136 part.state.time.compacted = int(time.time() * 1000)
137 result.stale_outputs_pruned += 1
138 result.tokens_freed += tokens
139 else:
140 tokens_freed += tokens
141 stale_outputs += 1
143 if not execute:
144 result.stale_outputs_pruned = stale_outputs
145 result.tokens_freed = tokens_freed
147 verb = "Cleared" if execute else "Identified"
148 result.operations.append(
149 f"{verb} {result.stale_outputs_pruned} stale outputs (>{max_age_hours}h old)"
150 )
152 return result
155def scan_and_deduplicate(
156 messages: list[Message],
157 config: PruningConfig,
158) -> CleanupResult:
159 """Scan for and remove duplicate outputs.
161 Args:
162 messages: Conversation messages
163 config: Pruning configuration
165 Returns:
166 CleanupResult with deduplication statistics
167 """
168 result = CleanupResult()
170 seen_hashes: dict[str, str] = {} # hash -> message_id
171 duplicates_found = 0
172 tokens_saved = 0
174 for msg in messages:
175 for part in msg.parts:
176 if part.type != "tool":
177 continue
179 if not hasattr(part, "state") or not hasattr(part.state, "output"):
180 continue
182 output = part.state.output
183 if not output:
184 continue
186 # Compute hash
187 output_hash = compute_content_hash(output)
189 # Check if duplicate
190 if output_hash in seen_hashes:
191 duplicates_found += 1
192 tokens_saved += count_tokens_fast(output)
193 else:
194 seen_hashes[output_hash] = msg.id
196 result.duplicates_removed = duplicates_found
197 result.tokens_freed = tokens_saved
198 result.operations.append(f"Found {duplicates_found} duplicate outputs")
200 return result
203def detect_context_issues(
204 messages: list[Message],
205 conversation: Conversation,
206 config: PruningConfig,
207) -> list[ContextIssue]:
208 """Detect quality and drift issues in conversation context.
210 Analyzes conversation for:
211 - High redundancy (duplicates)
212 - Staleness accumulation
213 - Low information density
214 - Error loss
215 - Excessive protection
217 Args:
218 messages: Conversation messages
219 conversation: Conversation object
220 config: Pruning configuration
222 Returns:
223 List of detected issues
224 """
225 issues: list[ContextIssue] = []
227 # 1. Check for high redundancy
228 redundancy = _calculate_redundancy(messages)
229 if redundancy > 0.25:
230 issues.append(
231 ContextIssue(
232 issue_type="high_redundancy",
233 severity="warning" if redundancy > 0.4 else "info",
234 description=f"High redundancy detected: {int(redundancy * 100)}% duplicate content",
235 affected_count=_count_duplicates(messages),
236 suggested_fix="Run scan_and_deduplicate() to remove duplicates",
237 metadata={"redundancy_ratio": redundancy},
238 )
239 )
241 # 2. Check for staleness accumulation
242 avg_age_turns = _calculate_average_age(messages)
243 if avg_age_turns > 15:
244 issues.append(
245 ContextIssue(
246 issue_type="staleness_accumulation",
247 severity="info",
248 description=f"Context aging: average message age is {avg_age_turns:.1f} turns",
249 suggested_fix="Consider running summarization to compress old context",
250 metadata={"avg_age_turns": avg_age_turns},
251 )
252 )
254 # 3. Check information density
255 density = _calculate_information_density(messages)
256 if density < 0.5:
257 issues.append(
258 ContextIssue(
259 issue_type="low_information_density",
260 severity="warning",
261 description=f"Low information density: {int(density * 100)}% unique content",
262 suggested_fix="Run deduplication and compaction",
263 metadata={"density": density},
264 )
265 )
267 # 4. Check for error preservation
268 error_count = _count_errors(messages)
269 if error_count > 0:
270 # Check if errors are being lost (this is informational)
271 issues.append(
272 ContextIssue(
273 issue_type="error_loss",
274 severity="info",
275 description=f"{error_count} error messages in context",
276 affected_count=error_count,
277 suggested_fix="Ensure preserve_errors config is enabled",
278 metadata={"error_count": error_count},
279 )
280 )
282 # 5. Check for excessive protection
283 protected_ratio = _calculate_protected_ratio(messages, config)
284 if protected_ratio > 0.5:
285 issues.append(
286 ContextIssue(
287 issue_type="excessive_protection",
288 severity="warning",
289 description=f"Excessive protection: {int(protected_ratio * 100)}% tokens protected",
290 suggested_fix="Review protected_tools config - may limit pruning effectiveness",
291 metadata={"protected_ratio": protected_ratio},
292 )
293 )
295 return issues
298def _calculate_redundancy(messages: list[Message]) -> float:
299 """Calculate redundancy ratio (0.0-1.0).
301 Args:
302 messages: Conversation messages
304 Returns:
305 Redundancy ratio
306 """
307 seen_hashes: set[str] = set()
308 total_outputs = 0
309 duplicate_outputs = 0
311 for msg in messages:
312 for part in msg.parts:
313 if part.type != "tool":
314 continue
316 if not hasattr(part, "state") or not hasattr(part.state, "output"):
317 continue
319 output = part.state.output
320 if not output:
321 continue
323 total_outputs += 1
324 output_hash = compute_content_hash(output)
326 if output_hash in seen_hashes:
327 duplicate_outputs += 1
328 else:
329 seen_hashes.add(output_hash)
331 if total_outputs == 0:
332 return 0.0
334 return duplicate_outputs / total_outputs
337def _count_duplicates(messages: list[Message]) -> int:
338 """Count duplicate outputs.
340 Args:
341 messages: Conversation messages
343 Returns:
344 Number of duplicates
345 """
346 seen_hashes: set[str] = set()
347 duplicates = 0
349 for msg in messages:
350 for part in msg.parts:
351 if part.type != "tool":
352 continue
354 if not hasattr(part, "state") or not hasattr(part.state, "output"):
355 continue
357 output = part.state.output
358 if not output:
359 continue
361 output_hash = compute_content_hash(output)
362 if output_hash in seen_hashes:
363 duplicates += 1
364 else:
365 seen_hashes.add(output_hash)
367 return duplicates
370def _calculate_average_age(messages: list[Message]) -> float:
371 """Calculate average message age in turns.
373 Args:
374 messages: Conversation messages
376 Returns:
377 Average age in turns
378 """
379 if not messages:
380 return 0.0
382 total_turns = len(messages)
383 total_age = 0.0
385 for idx, msg in enumerate(messages):
386 age = total_turns - idx - 1
387 total_age += age
389 return total_age / total_turns
392def _calculate_information_density(messages: list[Message]) -> float:
393 """Calculate information density (unique content ratio).
395 Args:
396 messages: Conversation messages
398 Returns:
399 Density ratio (0.0-1.0)
400 """
401 # Simple approximation: 1 - redundancy
402 redundancy = _calculate_redundancy(messages)
403 return 1.0 - redundancy
406def _count_errors(messages: list[Message]) -> int:
407 """Count error messages.
409 Args:
410 messages: Conversation messages
412 Returns:
413 Number of error messages
414 """
415 error_count = 0
417 for msg in messages:
418 if msg.error:
419 error_count += 1
420 continue
422 # Check tool outputs for errors
423 for part in msg.parts:
424 if part.type != "tool":
425 continue
427 if hasattr(part, "state"):
428 if part.state.status == "error":
429 error_count += 1
430 break
432 output = getattr(part.state, "output", "")
433 if output and "error" in output.lower():
434 error_count += 1
435 break
437 return error_count
440def _calculate_protected_ratio(
441 messages: list[Message], config: PruningConfig
442) -> float:
443 """Calculate ratio of protected tokens.
445 Args:
446 messages: Conversation messages
447 config: Pruning configuration
449 Returns:
450 Protected ratio (0.0-1.0)
451 """
452 total_tokens = 0
453 protected_tokens = 0
454 turns_skipped = 0
456 for msg in reversed(messages):
457 if msg.role == "user":
458 turns_skipped += 1
460 for part in msg.parts:
461 if part.type != "tool":
462 continue
464 if not hasattr(part, "state") or not hasattr(part.state, "output"):
465 continue
467 output = part.state.output
468 if not output:
469 continue
471 tokens = count_tokens_fast(output)
472 total_tokens += tokens
474 # Check if protected
475 tool = getattr(part, "tool", "")
476 is_protected = (
477 turns_skipped < config.protect_turns or tool in config.protected_tools
478 )
480 if is_protected:
481 protected_tokens += tokens
483 if total_tokens == 0:
484 return 0.0
486 return protected_tokens / total_tokens