Coverage for src / harnessutils / quality.py: 91%
187 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 08:30 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 08:30 -0600
1"""Phase 3B: Quality metrics for context health tracking.
3Calculates and tracks quality metrics over time to assess context health
4and provide actionable recommendations.
5"""
7import time
8from dataclasses import dataclass, field
9from typing import Any, cast
11from harnessutils.compaction.pruning import (
12 PruningDecision,
13 generate_shingles,
14)
15from harnessutils.config import PruningConfig
16from harnessutils.models.message import Message
17from harnessutils.models.parts import ToolPart
18from harnessutils.tokens.exact import count_tokens_fast
21@dataclass
22class QualitySnapshot:
23 """Single quality measurement at a point in time."""
25 timestamp: int # Unix ms
26 information_density: float # 0.0-1.0, unique info / total tokens
27 redundancy_ratio: float # 0.0-1.0, duplicate tokens / total tokens
28 staleness_score: float # 0.0-1.0, avg age-weighted decay
29 error_preservation_rate: float # 0.0-1.0, errors kept / total errors
30 protected_ratio: float # 0.0-1.0, protected tokens / total tokens
31 health: str # "good" | "degraded" | "poor"
32 recommendations: list[str] # Actionable suggestions
34 def to_dict(self) -> dict[str, Any]:
35 """Convert to dictionary for storage.
37 Returns:
38 Dictionary representation
39 """
40 return {
41 "timestamp": self.timestamp,
42 "information_density": self.information_density,
43 "redundancy_ratio": self.redundancy_ratio,
44 "staleness_score": self.staleness_score,
45 "error_preservation_rate": self.error_preservation_rate,
46 "protected_ratio": self.protected_ratio,
47 "health": self.health,
48 "recommendations": self.recommendations,
49 }
51 @classmethod
52 def from_dict(cls, data: dict[str, Any]) -> "QualitySnapshot":
53 """Create from dictionary.
55 Args:
56 data: Dictionary representation
58 Returns:
59 QualitySnapshot instance
60 """
61 return cls(
62 timestamp=data["timestamp"],
63 information_density=data["information_density"],
64 redundancy_ratio=data["redundancy_ratio"],
65 staleness_score=data["staleness_score"],
66 error_preservation_rate=data["error_preservation_rate"],
67 protected_ratio=data["protected_ratio"],
68 health=data["health"],
69 recommendations=data["recommendations"],
70 )
73@dataclass
74class QualityHistory:
75 """Time-series tracking of quality metrics."""
77 snapshots: list[QualitySnapshot] = field(default_factory=list)
78 max_snapshots: int = 50 # Keep last 50 measurements
80 def add_snapshot(self, snapshot: QualitySnapshot) -> None:
81 """Add quality snapshot to history.
83 Prepends new snapshot and trims to max_snapshots.
85 Args:
86 snapshot: Quality snapshot to add
87 """
88 self.snapshots.insert(0, snapshot)
90 # Trim to max_snapshots
91 if len(self.snapshots) > self.max_snapshots:
92 self.snapshots = self.snapshots[: self.max_snapshots]
94 def get_trend(self, metric_name: str, window: int = 20) -> list[tuple[int, float]]:
95 """Extract trend data for a specific metric.
97 Args:
98 metric_name: Name of metric to extract
99 window: Number of most recent snapshots to return
101 Returns:
102 List of (timestamp, value) tuples, most recent first
103 """
104 trend_data = []
106 for snapshot in self.snapshots[:window]:
107 value = getattr(snapshot, metric_name, None)
108 if value is not None:
109 trend_data.append((snapshot.timestamp, value))
111 return trend_data
113 def to_dict(self) -> dict[str, Any]:
114 """Convert to dictionary for storage.
116 Returns:
117 Dictionary representation
118 """
119 return {
120 "snapshots": [s.to_dict() for s in self.snapshots],
121 "max_snapshots": self.max_snapshots,
122 }
124 @classmethod
125 def from_dict(cls, data: dict[str, Any]) -> "QualityHistory":
126 """Create from dictionary.
128 Args:
129 data: Dictionary representation
131 Returns:
132 QualityHistory instance
133 """
134 snapshots = [QualitySnapshot.from_dict(s) for s in data.get("snapshots", [])]
135 return cls(
136 snapshots=snapshots,
137 max_snapshots=data.get("max_snapshots", 50),
138 )
141def calculate_information_density(messages: list[Message]) -> float:
142 """Calculate unique information ratio using shingling.
144 Strategy:
145 1. Extract all tool outputs
146 2. Generate 5-word shingles for each output
147 3. Calculate total unique shingles / total shingles
148 4. Return ratio (1.0 = all unique, 0.0 = all duplicate)
150 Args:
151 messages: Conversation messages
153 Returns:
154 Information density ratio (0.0-1.0)
155 """
156 unique_shingles = set()
157 total_shingles = 0
159 for msg in messages:
160 for part in msg.parts:
161 if part.type != "tool":
162 continue
164 tool_part = cast(ToolPart, part)
165 output = getattr(tool_part.state, "output", "")
166 if not output:
167 continue
169 # Generate 5-word shingles
170 shingles = generate_shingles(output, n=5)
172 # Count unique shingles
173 unique_shingles.update(shingles)
175 # Count total shingles (including duplicates across outputs)
176 total_shingles += len(shingles)
178 if total_shingles == 0:
179 return 1.0 # No outputs = perfect density
181 return len(unique_shingles) / total_shingles
184def calculate_redundancy_ratio(
185 messages: list[Message], config: PruningConfig
186) -> float:
187 """Calculate duplicate content ratio.
189 Strategy:
190 1. Run find_duplicate_output() for each output
191 2. Count tokens in duplicates
192 3. Return duplicate_tokens / total_tokens
194 Args:
195 messages: Conversation messages
196 config: Pruning config for duplicate detection
198 Returns:
199 Redundancy ratio (0.0-1.0)
200 """
201 from harnessutils.compaction.pruning import compute_content_hash
203 duplicate_tokens = 0
204 total_tokens = 0
206 # Build list of all tool parts with outputs
207 tool_parts: list[tuple[ToolPart, str, int]] = [] # (part, output, tokens)
208 for msg in messages:
209 for part in msg.parts:
210 if part.type == "tool":
211 tool_part = cast(ToolPart, part)
212 output = getattr(tool_part.state, "output", "")
213 if output:
214 tokens = count_tokens_fast(output)
215 tool_parts.append((tool_part, output, tokens))
216 total_tokens += tokens
218 # Check each output for duplicates using content hash
219 seen_hashes: dict[str, int] = {} # hash -> first occurrence index
221 for i, (part, output, tokens) in enumerate(tool_parts):
222 output_hash = compute_content_hash(output)
224 # Check if we've seen this exact output before (within lookback)
225 if output_hash in seen_hashes:
226 first_idx = seen_hashes[output_hash]
227 if i - first_idx <= config.duplicate_lookback:
228 # Within lookback window - count as duplicate
229 duplicate_tokens += tokens
230 else:
231 # First time seeing this output
232 seen_hashes[output_hash] = i
234 if total_tokens == 0:
235 return 0.0 # No outputs = no redundancy
237 return duplicate_tokens / total_tokens
240def calculate_staleness_score(messages: list[Message]) -> float:
241 """Calculate age-weighted staleness.
243 Strategy:
244 1. For each message, calculate age in turns
245 2. Apply exponential decay: exp(-age * 0.1)
246 3. Weight by token count
247 4. Return weighted average (0.0 = fresh, 1.0 = stale)
249 Args:
250 messages: Conversation messages
252 Returns:
253 Staleness score (0.0-1.0, lower is fresher)
254 """
255 import math
257 if not messages:
258 return 0.0
260 total_tokens = 0
261 weighted_staleness = 0.0
262 total_turns = len(messages)
264 for turn_idx, msg in enumerate(messages):
265 msg_tokens = msg.tokens.total if msg.tokens else 0
267 if msg_tokens == 0:
268 continue
270 # Age in turns (0 for most recent)
271 age = total_turns - turn_idx - 1
273 # Exponential decay: exp(-age * 0.1)
274 # Inverted to staleness: 1 - exp(-age * 0.1)
275 freshness = math.exp(-age * 0.1)
276 staleness = 1.0 - freshness
278 weighted_staleness += staleness * msg_tokens
279 total_tokens += msg_tokens
281 if total_tokens == 0:
282 return 0.0
284 return weighted_staleness / total_tokens
287def calculate_error_preservation_rate(
288 messages: list[Message],
289 decisions: list[PruningDecision] | None = None,
290) -> float:
291 """Calculate ratio of errors still in context.
293 Strategy:
294 1. Count total error outputs (status="error" or output contains "error"/"exception")
295 2. Count error outputs with decision="kept"
296 3. Return kept_errors / total_errors (1.0 if no errors exist)
298 Args:
299 messages: Conversation messages
300 decisions: Optional pruning decisions
302 Returns:
303 Error preservation rate (0.0-1.0)
304 """
305 total_errors = 0
306 kept_errors = 0
308 # Build error detection
309 error_keywords = ["error", "exception", "traceback", "failed"]
311 for msg in messages:
312 for part in msg.parts:
313 if part.type != "tool":
314 continue
316 tool_part = cast(ToolPart, part)
317 # Check if error
318 is_error = False
319 status = getattr(tool_part.state, "status", "")
320 output = getattr(tool_part.state, "output", "")
322 if status == "error":
323 is_error = True
324 else:
325 # Check output for error keywords
326 output_lower = output.lower()
327 if any(keyword in output_lower for keyword in error_keywords):
328 is_error = True
330 if not is_error:
331 continue
333 total_errors += 1
335 # If decisions available, check if kept
336 if decisions:
337 # Find decision for this output
338 call_id = getattr(tool_part, "call_id", None)
339 if call_id:
340 for decision in decisions:
341 if decision.part_id == call_id and decision.decision == "kept":
342 kept_errors += 1
343 break
344 else:
345 # No decision info, assume kept if still in messages
346 kept_errors += 1
347 else:
348 # No decisions = all errors still present
349 kept_errors += 1
351 if total_errors == 0:
352 return 1.0 # No errors = perfect preservation
354 return kept_errors / total_errors
357def calculate_protected_ratio(messages: list[Message], config: PruningConfig) -> float:
358 """Calculate ratio of protected tokens.
360 Strategy:
361 1. Count tool output tokens and determine protection status
362 2. Return protected_tokens / total_tokens
364 Args:
365 messages: Conversation messages
366 config: Pruning config
368 Returns:
369 Protected ratio (0.0-1.0)
370 """
371 total_tokens = 0
372 protected_tokens = 0
373 turns_skipped = 0
375 for msg in reversed(messages):
376 if msg.role == "user":
377 turns_skipped += 1
379 for part in msg.parts:
380 if part.type != "tool":
381 continue
383 tool_part = cast(ToolPart, part)
384 output = getattr(tool_part.state, "output", "")
385 if not output:
386 continue
388 tokens = count_tokens_fast(output)
389 total_tokens += tokens
391 # Check protection status
392 tool = tool_part.tool
393 is_protected = (
394 turns_skipped < config.protect_turns
395 or tool in config.protected_tools
396 )
398 if is_protected:
399 protected_tokens += tokens
401 if total_tokens == 0:
402 return 0.0
404 return protected_tokens / total_tokens
407def calculate_health(snapshot: QualitySnapshot) -> str:
408 """Classify health based on metric thresholds.
410 Thresholds:
411 - "good": density > 0.7, redundancy < 0.2, staleness < 0.3, error_rate > 0.9
412 - "poor": density < 0.5, redundancy > 0.4, staleness > 0.5, error_rate < 0.7
413 - "degraded": everything else
415 Args:
416 snapshot: Quality snapshot to classify
418 Returns:
419 Health classification
420 """
421 # Good thresholds
422 is_good = (
423 snapshot.information_density > 0.7
424 and snapshot.redundancy_ratio < 0.2
425 and snapshot.staleness_score < 0.3
426 and snapshot.error_preservation_rate > 0.9
427 )
429 if is_good:
430 return "good"
432 # Poor thresholds
433 is_poor = (
434 snapshot.information_density < 0.5
435 or snapshot.redundancy_ratio > 0.4
436 or snapshot.staleness_score > 0.5
437 or snapshot.error_preservation_rate < 0.7
438 )
440 if is_poor:
441 return "poor"
443 return "degraded"
446def generate_recommendations(snapshot: QualitySnapshot) -> list[str]:
447 """Generate actionable recommendations.
449 Rules:
450 - redundancy > 0.2: "Consider deduplication - {X}% redundancy"
451 - protected_ratio > 0.4: "Protected ratio high - may limit pruning"
452 - staleness > 0.4: "Context aging - consider summarization"
453 - error_rate < 0.8: "Error messages being lost - check pruning config"
454 - density < 0.6: "Low information density - run compaction"
456 Args:
457 snapshot: Quality snapshot
459 Returns:
460 List of recommendations
461 """
462 recommendations = []
464 if snapshot.redundancy_ratio > 0.2:
465 pct = int(snapshot.redundancy_ratio * 100)
466 recommendations.append(f"Consider deduplication - {pct}% redundancy")
468 if snapshot.protected_ratio > 0.4:
469 recommendations.append("Protected ratio high - may limit pruning")
471 if snapshot.staleness_score > 0.4:
472 recommendations.append("Context aging - consider summarization")
474 if snapshot.error_preservation_rate < 0.8:
475 recommendations.append("Error messages being lost - check pruning config")
477 if snapshot.information_density < 0.6:
478 recommendations.append("Low information density - run compaction")
480 return recommendations
483def assess_quality(
484 messages: list[Message],
485 config: PruningConfig,
486 decisions: list[PruningDecision] | None = None,
487) -> QualitySnapshot:
488 """Calculate all quality metrics and generate assessment.
490 Args:
491 messages: Conversation messages
492 config: Pruning config for thresholds
493 decisions: Optional pruning decisions for error tracking
495 Returns:
496 Complete quality snapshot
497 """
498 snapshot = QualitySnapshot(
499 timestamp=int(time.time() * 1000),
500 information_density=calculate_information_density(messages),
501 redundancy_ratio=calculate_redundancy_ratio(messages, config),
502 staleness_score=calculate_staleness_score(messages),
503 error_preservation_rate=calculate_error_preservation_rate(
504 messages, decisions
505 ),
506 protected_ratio=calculate_protected_ratio(messages, config),
507 health="", # Set below
508 recommendations=[], # Set below
509 )
511 snapshot.health = calculate_health(snapshot)
512 snapshot.recommendations = generate_recommendations(snapshot)
514 return snapshot