Coverage for src / harnessutils / quality.py: 91%

187 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 08:30 -0600

1"""Phase 3B: Quality metrics for context health tracking. 

2 

3Calculates and tracks quality metrics over time to assess context health 

4and provide actionable recommendations. 

5""" 

6 

7import time 

8from dataclasses import dataclass, field 

9from typing import Any, cast 

10 

11from harnessutils.compaction.pruning import ( 

12 PruningDecision, 

13 generate_shingles, 

14) 

15from harnessutils.config import PruningConfig 

16from harnessutils.models.message import Message 

17from harnessutils.models.parts import ToolPart 

18from harnessutils.tokens.exact import count_tokens_fast 

19 

20 

21@dataclass 

22class QualitySnapshot: 

23 """Single quality measurement at a point in time.""" 

24 

25 timestamp: int # Unix ms 

26 information_density: float # 0.0-1.0, unique info / total tokens 

27 redundancy_ratio: float # 0.0-1.0, duplicate tokens / total tokens 

28 staleness_score: float # 0.0-1.0, avg age-weighted decay 

29 error_preservation_rate: float # 0.0-1.0, errors kept / total errors 

30 protected_ratio: float # 0.0-1.0, protected tokens / total tokens 

31 health: str # "good" | "degraded" | "poor" 

32 recommendations: list[str] # Actionable suggestions 

33 

34 def to_dict(self) -> dict[str, Any]: 

35 """Convert to dictionary for storage. 

36 

37 Returns: 

38 Dictionary representation 

39 """ 

40 return { 

41 "timestamp": self.timestamp, 

42 "information_density": self.information_density, 

43 "redundancy_ratio": self.redundancy_ratio, 

44 "staleness_score": self.staleness_score, 

45 "error_preservation_rate": self.error_preservation_rate, 

46 "protected_ratio": self.protected_ratio, 

47 "health": self.health, 

48 "recommendations": self.recommendations, 

49 } 

50 

51 @classmethod 

52 def from_dict(cls, data: dict[str, Any]) -> "QualitySnapshot": 

53 """Create from dictionary. 

54 

55 Args: 

56 data: Dictionary representation 

57 

58 Returns: 

59 QualitySnapshot instance 

60 """ 

61 return cls( 

62 timestamp=data["timestamp"], 

63 information_density=data["information_density"], 

64 redundancy_ratio=data["redundancy_ratio"], 

65 staleness_score=data["staleness_score"], 

66 error_preservation_rate=data["error_preservation_rate"], 

67 protected_ratio=data["protected_ratio"], 

68 health=data["health"], 

69 recommendations=data["recommendations"], 

70 ) 

71 

72 

73@dataclass 

74class QualityHistory: 

75 """Time-series tracking of quality metrics.""" 

76 

77 snapshots: list[QualitySnapshot] = field(default_factory=list) 

78 max_snapshots: int = 50 # Keep last 50 measurements 

79 

80 def add_snapshot(self, snapshot: QualitySnapshot) -> None: 

81 """Add quality snapshot to history. 

82 

83 Prepends new snapshot and trims to max_snapshots. 

84 

85 Args: 

86 snapshot: Quality snapshot to add 

87 """ 

88 self.snapshots.insert(0, snapshot) 

89 

90 # Trim to max_snapshots 

91 if len(self.snapshots) > self.max_snapshots: 

92 self.snapshots = self.snapshots[: self.max_snapshots] 

93 

94 def get_trend(self, metric_name: str, window: int = 20) -> list[tuple[int, float]]: 

95 """Extract trend data for a specific metric. 

96 

97 Args: 

98 metric_name: Name of metric to extract 

99 window: Number of most recent snapshots to return 

100 

101 Returns: 

102 List of (timestamp, value) tuples, most recent first 

103 """ 

104 trend_data = [] 

105 

106 for snapshot in self.snapshots[:window]: 

107 value = getattr(snapshot, metric_name, None) 

108 if value is not None: 

109 trend_data.append((snapshot.timestamp, value)) 

110 

111 return trend_data 

112 

113 def to_dict(self) -> dict[str, Any]: 

114 """Convert to dictionary for storage. 

115 

116 Returns: 

117 Dictionary representation 

118 """ 

119 return { 

120 "snapshots": [s.to_dict() for s in self.snapshots], 

121 "max_snapshots": self.max_snapshots, 

122 } 

123 

124 @classmethod 

125 def from_dict(cls, data: dict[str, Any]) -> "QualityHistory": 

126 """Create from dictionary. 

127 

128 Args: 

129 data: Dictionary representation 

130 

131 Returns: 

132 QualityHistory instance 

133 """ 

134 snapshots = [QualitySnapshot.from_dict(s) for s in data.get("snapshots", [])] 

135 return cls( 

136 snapshots=snapshots, 

137 max_snapshots=data.get("max_snapshots", 50), 

138 ) 

139 

140 

141def calculate_information_density(messages: list[Message]) -> float: 

142 """Calculate unique information ratio using shingling. 

143 

144 Strategy: 

145 1. Extract all tool outputs 

146 2. Generate 5-word shingles for each output 

147 3. Calculate total unique shingles / total shingles 

148 4. Return ratio (1.0 = all unique, 0.0 = all duplicate) 

149 

150 Args: 

151 messages: Conversation messages 

152 

153 Returns: 

154 Information density ratio (0.0-1.0) 

155 """ 

156 unique_shingles = set() 

157 total_shingles = 0 

158 

159 for msg in messages: 

160 for part in msg.parts: 

161 if part.type != "tool": 

162 continue 

163 

164 tool_part = cast(ToolPart, part) 

165 output = getattr(tool_part.state, "output", "") 

166 if not output: 

167 continue 

168 

169 # Generate 5-word shingles 

170 shingles = generate_shingles(output, n=5) 

171 

172 # Count unique shingles 

173 unique_shingles.update(shingles) 

174 

175 # Count total shingles (including duplicates across outputs) 

176 total_shingles += len(shingles) 

177 

178 if total_shingles == 0: 

179 return 1.0 # No outputs = perfect density 

180 

181 return len(unique_shingles) / total_shingles 

182 

183 

184def calculate_redundancy_ratio( 

185 messages: list[Message], config: PruningConfig 

186) -> float: 

187 """Calculate duplicate content ratio. 

188 

189 Strategy: 

190 1. Run find_duplicate_output() for each output 

191 2. Count tokens in duplicates 

192 3. Return duplicate_tokens / total_tokens 

193 

194 Args: 

195 messages: Conversation messages 

196 config: Pruning config for duplicate detection 

197 

198 Returns: 

199 Redundancy ratio (0.0-1.0) 

200 """ 

201 from harnessutils.compaction.pruning import compute_content_hash 

202 

203 duplicate_tokens = 0 

204 total_tokens = 0 

205 

206 # Build list of all tool parts with outputs 

207 tool_parts: list[tuple[ToolPart, str, int]] = [] # (part, output, tokens) 

208 for msg in messages: 

209 for part in msg.parts: 

210 if part.type == "tool": 

211 tool_part = cast(ToolPart, part) 

212 output = getattr(tool_part.state, "output", "") 

213 if output: 

214 tokens = count_tokens_fast(output) 

215 tool_parts.append((tool_part, output, tokens)) 

216 total_tokens += tokens 

217 

218 # Check each output for duplicates using content hash 

219 seen_hashes: dict[str, int] = {} # hash -> first occurrence index 

220 

221 for i, (part, output, tokens) in enumerate(tool_parts): 

222 output_hash = compute_content_hash(output) 

223 

224 # Check if we've seen this exact output before (within lookback) 

225 if output_hash in seen_hashes: 

226 first_idx = seen_hashes[output_hash] 

227 if i - first_idx <= config.duplicate_lookback: 

228 # Within lookback window - count as duplicate 

229 duplicate_tokens += tokens 

230 else: 

231 # First time seeing this output 

232 seen_hashes[output_hash] = i 

233 

234 if total_tokens == 0: 

235 return 0.0 # No outputs = no redundancy 

236 

237 return duplicate_tokens / total_tokens 

238 

239 

240def calculate_staleness_score(messages: list[Message]) -> float: 

241 """Calculate age-weighted staleness. 

242 

243 Strategy: 

244 1. For each message, calculate age in turns 

245 2. Apply exponential decay: exp(-age * 0.1) 

246 3. Weight by token count 

247 4. Return weighted average (0.0 = fresh, 1.0 = stale) 

248 

249 Args: 

250 messages: Conversation messages 

251 

252 Returns: 

253 Staleness score (0.0-1.0, lower is fresher) 

254 """ 

255 import math 

256 

257 if not messages: 

258 return 0.0 

259 

260 total_tokens = 0 

261 weighted_staleness = 0.0 

262 total_turns = len(messages) 

263 

264 for turn_idx, msg in enumerate(messages): 

265 msg_tokens = msg.tokens.total if msg.tokens else 0 

266 

267 if msg_tokens == 0: 

268 continue 

269 

270 # Age in turns (0 for most recent) 

271 age = total_turns - turn_idx - 1 

272 

273 # Exponential decay: exp(-age * 0.1) 

274 # Inverted to staleness: 1 - exp(-age * 0.1) 

275 freshness = math.exp(-age * 0.1) 

276 staleness = 1.0 - freshness 

277 

278 weighted_staleness += staleness * msg_tokens 

279 total_tokens += msg_tokens 

280 

281 if total_tokens == 0: 

282 return 0.0 

283 

284 return weighted_staleness / total_tokens 

285 

286 

287def calculate_error_preservation_rate( 

288 messages: list[Message], 

289 decisions: list[PruningDecision] | None = None, 

290) -> float: 

291 """Calculate ratio of errors still in context. 

292 

293 Strategy: 

294 1. Count total error outputs (status="error" or output contains "error"/"exception") 

295 2. Count error outputs with decision="kept" 

296 3. Return kept_errors / total_errors (1.0 if no errors exist) 

297 

298 Args: 

299 messages: Conversation messages 

300 decisions: Optional pruning decisions 

301 

302 Returns: 

303 Error preservation rate (0.0-1.0) 

304 """ 

305 total_errors = 0 

306 kept_errors = 0 

307 

308 # Build error detection 

309 error_keywords = ["error", "exception", "traceback", "failed"] 

310 

311 for msg in messages: 

312 for part in msg.parts: 

313 if part.type != "tool": 

314 continue 

315 

316 tool_part = cast(ToolPart, part) 

317 # Check if error 

318 is_error = False 

319 status = getattr(tool_part.state, "status", "") 

320 output = getattr(tool_part.state, "output", "") 

321 

322 if status == "error": 

323 is_error = True 

324 else: 

325 # Check output for error keywords 

326 output_lower = output.lower() 

327 if any(keyword in output_lower for keyword in error_keywords): 

328 is_error = True 

329 

330 if not is_error: 

331 continue 

332 

333 total_errors += 1 

334 

335 # If decisions available, check if kept 

336 if decisions: 

337 # Find decision for this output 

338 call_id = getattr(tool_part, "call_id", None) 

339 if call_id: 

340 for decision in decisions: 

341 if decision.part_id == call_id and decision.decision == "kept": 

342 kept_errors += 1 

343 break 

344 else: 

345 # No decision info, assume kept if still in messages 

346 kept_errors += 1 

347 else: 

348 # No decisions = all errors still present 

349 kept_errors += 1 

350 

351 if total_errors == 0: 

352 return 1.0 # No errors = perfect preservation 

353 

354 return kept_errors / total_errors 

355 

356 

357def calculate_protected_ratio(messages: list[Message], config: PruningConfig) -> float: 

358 """Calculate ratio of protected tokens. 

359 

360 Strategy: 

361 1. Count tool output tokens and determine protection status 

362 2. Return protected_tokens / total_tokens 

363 

364 Args: 

365 messages: Conversation messages 

366 config: Pruning config 

367 

368 Returns: 

369 Protected ratio (0.0-1.0) 

370 """ 

371 total_tokens = 0 

372 protected_tokens = 0 

373 turns_skipped = 0 

374 

375 for msg in reversed(messages): 

376 if msg.role == "user": 

377 turns_skipped += 1 

378 

379 for part in msg.parts: 

380 if part.type != "tool": 

381 continue 

382 

383 tool_part = cast(ToolPart, part) 

384 output = getattr(tool_part.state, "output", "") 

385 if not output: 

386 continue 

387 

388 tokens = count_tokens_fast(output) 

389 total_tokens += tokens 

390 

391 # Check protection status 

392 tool = tool_part.tool 

393 is_protected = ( 

394 turns_skipped < config.protect_turns 

395 or tool in config.protected_tools 

396 ) 

397 

398 if is_protected: 

399 protected_tokens += tokens 

400 

401 if total_tokens == 0: 

402 return 0.0 

403 

404 return protected_tokens / total_tokens 

405 

406 

407def calculate_health(snapshot: QualitySnapshot) -> str: 

408 """Classify health based on metric thresholds. 

409 

410 Thresholds: 

411 - "good": density > 0.7, redundancy < 0.2, staleness < 0.3, error_rate > 0.9 

412 - "poor": density < 0.5, redundancy > 0.4, staleness > 0.5, error_rate < 0.7 

413 - "degraded": everything else 

414 

415 Args: 

416 snapshot: Quality snapshot to classify 

417 

418 Returns: 

419 Health classification 

420 """ 

421 # Good thresholds 

422 is_good = ( 

423 snapshot.information_density > 0.7 

424 and snapshot.redundancy_ratio < 0.2 

425 and snapshot.staleness_score < 0.3 

426 and snapshot.error_preservation_rate > 0.9 

427 ) 

428 

429 if is_good: 

430 return "good" 

431 

432 # Poor thresholds 

433 is_poor = ( 

434 snapshot.information_density < 0.5 

435 or snapshot.redundancy_ratio > 0.4 

436 or snapshot.staleness_score > 0.5 

437 or snapshot.error_preservation_rate < 0.7 

438 ) 

439 

440 if is_poor: 

441 return "poor" 

442 

443 return "degraded" 

444 

445 

446def generate_recommendations(snapshot: QualitySnapshot) -> list[str]: 

447 """Generate actionable recommendations. 

448 

449 Rules: 

450 - redundancy > 0.2: "Consider deduplication - {X}% redundancy" 

451 - protected_ratio > 0.4: "Protected ratio high - may limit pruning" 

452 - staleness > 0.4: "Context aging - consider summarization" 

453 - error_rate < 0.8: "Error messages being lost - check pruning config" 

454 - density < 0.6: "Low information density - run compaction" 

455 

456 Args: 

457 snapshot: Quality snapshot 

458 

459 Returns: 

460 List of recommendations 

461 """ 

462 recommendations = [] 

463 

464 if snapshot.redundancy_ratio > 0.2: 

465 pct = int(snapshot.redundancy_ratio * 100) 

466 recommendations.append(f"Consider deduplication - {pct}% redundancy") 

467 

468 if snapshot.protected_ratio > 0.4: 

469 recommendations.append("Protected ratio high - may limit pruning") 

470 

471 if snapshot.staleness_score > 0.4: 

472 recommendations.append("Context aging - consider summarization") 

473 

474 if snapshot.error_preservation_rate < 0.8: 

475 recommendations.append("Error messages being lost - check pruning config") 

476 

477 if snapshot.information_density < 0.6: 

478 recommendations.append("Low information density - run compaction") 

479 

480 return recommendations 

481 

482 

483def assess_quality( 

484 messages: list[Message], 

485 config: PruningConfig, 

486 decisions: list[PruningDecision] | None = None, 

487) -> QualitySnapshot: 

488 """Calculate all quality metrics and generate assessment. 

489 

490 Args: 

491 messages: Conversation messages 

492 config: Pruning config for thresholds 

493 decisions: Optional pruning decisions for error tracking 

494 

495 Returns: 

496 Complete quality snapshot 

497 """ 

498 snapshot = QualitySnapshot( 

499 timestamp=int(time.time() * 1000), 

500 information_density=calculate_information_density(messages), 

501 redundancy_ratio=calculate_redundancy_ratio(messages, config), 

502 staleness_score=calculate_staleness_score(messages), 

503 error_preservation_rate=calculate_error_preservation_rate( 

504 messages, decisions 

505 ), 

506 protected_ratio=calculate_protected_ratio(messages, config), 

507 health="", # Set below 

508 recommendations=[], # Set below 

509 ) 

510 

511 snapshot.health = calculate_health(snapshot) 

512 snapshot.recommendations = generate_recommendations(snapshot) 

513 

514 return snapshot