Coverage for src / harnessutils / compaction / pruning.py: 88%

289 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 08:30 -0600

1"""Tier 2: Selective pruning of tool outputs. 

2 

3Removes old tool outputs while preserving conversation structure. 

4Cost: Cheap (~50ms), Latency: ~50ms. 

5""" 

6 

7import hashlib 

8import math 

9import time 

10from dataclasses import dataclass, field 

11from typing import Any, Literal 

12 

13from harnessutils.config import PruningConfig 

14from harnessutils.models.message import Message 

15from harnessutils.models.parts import ToolPart 

16from harnessutils.tokens.exact import count_tokens_fast 

17 

18 

19@dataclass 

20class PruningDecision: 

21 """Individual pruning decision for a tool output. 

22 

23 Tracks why a specific output was kept or pruned, enabling full 

24 audit trail and debugging of pruning behavior. 

25 """ 

26 

27 message_id: str # Message containing this output 

28 part_id: str # Tool part ID 

29 tool: str # Tool name 

30 decision: Literal[ 

31 "kept", # Output kept in context 

32 "pruned_duplicate", # Removed as duplicate 

33 "pruned_low_importance", # Removed due to low importance score 

34 "pruned_fifo", # Removed by FIFO (oldest first) 

35 "protected_recent", # Protected by recency (protect_turns) 

36 "protected_tool", # Protected by tool type (protected_tools) 

37 "protected_summary", # Protected (after summary boundary) 

38 ] 

39 importance_score: float | None = None # Importance score (if scored) 

40 duplicate_of: str | None = None # Part ID of duplicate (if duplicate) 

41 tokens: int = 0 # Token count of this output 

42 timestamp: int = 0 # When decision was made (unix ms) 

43 metadata: dict[str, Any] = field(default_factory=dict) # Additional context 

44 

45 def to_dict(self) -> dict[str, Any]: 

46 """Convert to dictionary for serialization.""" 

47 return { 

48 "message_id": self.message_id, 

49 "part_id": self.part_id, 

50 "tool": self.tool, 

51 "decision": self.decision, 

52 "importance_score": self.importance_score, 

53 "duplicate_of": self.duplicate_of, 

54 "tokens": self.tokens, 

55 "timestamp": self.timestamp, 

56 "metadata": self.metadata, 

57 } 

58 

59 @classmethod 

60 def from_dict(cls, data: dict[str, Any]) -> "PruningDecision": 

61 """Create from dictionary. 

62 

63 Args: 

64 data: Dictionary representation 

65 

66 Returns: 

67 PruningDecision instance 

68 """ 

69 return cls( 

70 message_id=data["message_id"], 

71 part_id=data["part_id"], 

72 tool=data["tool"], 

73 decision=data["decision"], 

74 importance_score=data.get("importance_score"), 

75 duplicate_of=data.get("duplicate_of"), 

76 tokens=data.get("tokens", 0), 

77 timestamp=data.get("timestamp", 0), 

78 metadata=data.get("metadata", {}), 

79 ) 

80 

81 @property 

82 def was_pruned(self) -> bool: 

83 """Check if this output was pruned.""" 

84 return self.decision.startswith("pruned_") 

85 

86 @property 

87 def was_protected(self) -> bool: 

88 """Check if this output was protected from pruning.""" 

89 return self.decision.startswith("protected_") 

90 

91 

92@dataclass 

93class PruningResult: 

94 """Result of pruning operation with detailed token tracking.""" 

95 

96 pruned: int # Total outputs pruned 

97 tokens_saved: int # Total tokens saved 

98 tokens_before: int = 0 # Token count before pruning 

99 tokens_after: int = 0 # Token count after pruning 

100 duplicates_pruned: int = 0 # Outputs pruned due to duplication 

101 importance_pruned: int = 0 # Outputs pruned due to low importance 

102 duplicate_tokens_saved: int = 0 # Tokens saved from deduplication 

103 importance_tokens_saved: int = 0 # Tokens saved from importance pruning 

104 decisions: list[PruningDecision] = field(default_factory=list) # Full decision log 

105 

106 def to_dict(self) -> dict[str, int | float]: 

107 """Convert to dictionary for reporting. 

108 

109 Returns: 

110 Dictionary with all pruning metrics 

111 """ 

112 return { 

113 "pruned": self.pruned, 

114 "tokens_saved": self.tokens_saved, 

115 "tokens_before": self.tokens_before, 

116 "tokens_after": self.tokens_after, 

117 "duplicates_pruned": self.duplicates_pruned, 

118 "importance_pruned": self.importance_pruned, 

119 "duplicate_tokens_saved": self.duplicate_tokens_saved, 

120 "importance_tokens_saved": self.importance_tokens_saved, 

121 "reduction_percent": round( 

122 (self.tokens_saved / self.tokens_before * 100) if self.tokens_before > 0 else 0, 

123 1, 

124 ), 

125 } 

126 

127 def __str__(self) -> str: 

128 """Human-readable summary of pruning results.""" 

129 if self.pruned == 0: 

130 return "No pruning needed" 

131 

132 lines = [ 

133 f"Pruned {self.pruned} outputs, saved {self.tokens_saved:,} tokens", 

134 f" Before: {self.tokens_before:,} tokens", 

135 f" After: {self.tokens_after:,} tokens", 

136 f" Reduction: {self.to_dict()['reduction_percent']}%", 

137 ] 

138 

139 if self.duplicates_pruned > 0: 

140 lines.append( 

141 f" Duplicates: {self.duplicates_pruned} removed " 

142 f"({self.duplicate_tokens_saved:,} tokens)" 

143 ) 

144 

145 if self.importance_pruned > 0: 

146 lines.append( 

147 f" Low importance: {self.importance_pruned} removed " 

148 f"({self.importance_tokens_saved:,} tokens)" 

149 ) 

150 

151 return "\n".join(lines) 

152 

153 def get_pruned_decisions(self) -> list[PruningDecision]: 

154 """Get all decisions that resulted in pruning. 

155 

156 Returns: 

157 List of decisions where output was pruned 

158 """ 

159 return [d for d in self.decisions if d.was_pruned] 

160 

161 def get_protected_decisions(self) -> list[PruningDecision]: 

162 """Get all decisions that resulted in protection. 

163 

164 Returns: 

165 List of decisions where output was protected 

166 """ 

167 return [d for d in self.decisions if d.was_protected] 

168 

169 def get_kept_decisions(self) -> list[PruningDecision]: 

170 """Get all decisions where output was kept. 

171 

172 Returns: 

173 List of decisions where output was kept (not pruned or protected) 

174 """ 

175 return [d for d in self.decisions if d.decision == "kept"] 

176 

177 def get_decision_by_part(self, part_id: str) -> PruningDecision | None: 

178 """Find decision for specific tool part. 

179 

180 Args: 

181 part_id: Tool part ID to find 

182 

183 Returns: 

184 Decision for that part, or None if not found 

185 """ 

186 for decision in self.decisions: 

187 if decision.part_id == part_id: 

188 return decision 

189 return None 

190 

191 

192@dataclass 

193class OutputImportance: 

194 """Importance score components for a tool output.""" 

195 

196 recency_score: float # Exponential decay based on age 

197 size_penalty: float # Penalty for large outputs 

198 semantic_score: float # Content-based importance (errors, warnings) 

199 tool_priority: float # Tool type importance 

200 token_count: int # Actual token count 

201 

202 @property 

203 def total_score(self) -> float: 

204 """Calculate weighted total importance score. 

205 

206 Higher score = more important = keep longer. 

207 Lower score = less important = prune first. 

208 """ 

209 return ( 

210 self.recency_score 

211 + self.size_penalty 

212 + self.semantic_score 

213 + self.tool_priority 

214 ) 

215 

216 

217def generate_shingles(text: str, n: int = 5) -> set[str]: 

218 """Generate word-level n-grams (shingles) for similarity detection. 

219 

220 Args: 

221 text: Text to generate shingles from 

222 n: Size of n-grams (default: 5 words) 

223 

224 Returns: 

225 Set of n-gram strings 

226 """ 

227 # Normalize text 

228 words = text.lower().split() 

229 

230 if len(words) < n: 

231 # If text too short, use the whole thing 

232 return {" ".join(words)} if words else set() 

233 

234 # Generate n-grams 

235 shingles = set() 

236 for i in range(len(words) - n + 1): 

237 shingle = " ".join(words[i : i + n]) 

238 shingles.add(shingle) 

239 

240 return shingles 

241 

242 

243def jaccard_similarity(set1: set[str], set2: set[str]) -> float: 

244 """Calculate Jaccard similarity between two sets. 

245 

246 Jaccard similarity = |intersection| / |union| 

247 Returns value between 0.0 (no overlap) and 1.0 (identical). 

248 

249 Args: 

250 set1: First set 

251 set2: Second set 

252 

253 Returns: 

254 Similarity score [0.0, 1.0] 

255 """ 

256 if not set1 or not set2: 

257 return 0.0 

258 

259 intersection = len(set1 & set2) 

260 union = len(set1 | set2) 

261 

262 if union == 0: 

263 return 0.0 

264 

265 return intersection / union 

266 

267 

268def compute_content_hash(text: str) -> str: 

269 """Compute fast hash of text content for exact duplicate detection. 

270 

271 Args: 

272 text: Text to hash 

273 

274 Returns: 

275 MD5 hash hex string 

276 """ 

277 return hashlib.md5(text.encode("utf-8")).hexdigest() 

278 

279 

280def find_duplicate_output( 

281 part: ToolPart, 

282 recent_parts: list[tuple[ToolPart, set[str]]], 

283 similarity_threshold: float = 0.8, 

284) -> ToolPart | None: 

285 """Find if output is duplicate of a recent output. 

286 

287 Args: 

288 part: Tool part to check for duplication 

289 recent_parts: List of (part, shingles) tuples to compare against 

290 similarity_threshold: Minimum similarity to consider duplicate (default: 0.8) 

291 

292 Returns: 

293 The duplicate part if found, None otherwise 

294 """ 

295 # Only compare against same tool type 

296 # Different tools are unlikely to produce identical outputs 

297 # This prevents false positives across tool types 

298 same_tool_parts = [ 

299 (p, shingles) for p, shingles in recent_parts if p.tool == part.tool 

300 ] 

301 

302 if not same_tool_parts: 

303 return None 

304 

305 # Fast path: exact duplicate check (within same tool type) 

306 current_hash = compute_content_hash(part.state.output) 

307 

308 for recent_part, _ in same_tool_parts: 

309 if compute_content_hash(recent_part.state.output) == current_hash: 

310 return recent_part # Exact duplicate 

311 

312 # Generate shingles for current part 

313 current_shingles = generate_shingles(part.state.output) 

314 

315 # Check similarity against recent parts of same tool 

316 for recent_part, recent_shingles in same_tool_parts: 

317 similarity = jaccard_similarity(current_shingles, recent_shingles) 

318 

319 if similarity >= similarity_threshold: 

320 return recent_part # Found similar output 

321 

322 return None 

323 

324 

325def calculate_context_tokens(messages: list[Message]) -> int: 

326 """Calculate total token count for all tool outputs in conversation. 

327 

328 Args: 

329 messages: All conversation messages 

330 

331 Returns: 

332 Total token count 

333 """ 

334 total = 0 

335 for msg in messages: 

336 for part in msg.parts: 

337 if isinstance(part, ToolPart) and part.state.status == "completed": 

338 if part.state.output: # Only count non-empty outputs 

339 total += count_tokens_fast(part.state.output) 

340 return total 

341 

342 

343def calculate_turn_age(messages: list[Message], target_msg: Message) -> int: 

344 """Calculate how many turns ago a message was created. 

345 

346 Args: 

347 messages: All conversation messages 

348 target_msg: Message to calculate age for 

349 

350 Returns: 

351 Number of user turns since this message (0 = current turn) 

352 """ 

353 turn_count = 0 

354 for msg in reversed(messages): 

355 if msg.id == target_msg.id: 

356 return turn_count 

357 if msg.role == "user": 

358 turn_count += 1 

359 return turn_count 

360 

361 

362def score_tool_output( 

363 part: ToolPart, 

364 message: Message, 

365 messages: list[Message], 

366 config: PruningConfig, 

367) -> OutputImportance: 

368 """Calculate importance score for a tool output. 

369 

370 Args: 

371 part: Tool part to score 

372 message: Message containing this part 

373 messages: All conversation messages 

374 config: Pruning configuration with scoring weights 

375 

376 Returns: 

377 OutputImportance with all score components 

378 """ 

379 token_count = count_tokens_fast(part.state.output) 

380 

381 # 1. Recency score (exponential decay) 

382 age = calculate_turn_age(messages, message) 

383 recency = math.exp(-age * config.recency_decay) * 100 * config.recency_weight 

384 

385 # 2. Size penalty (prefer removing large outputs) 

386 size_penalty = math.log(token_count + 1) * 10 * config.size_weight 

387 

388 # 3. Semantic importance 

389 semantic = 0.0 

390 output_lower = part.state.output.lower() 

391 

392 # Check for errors 

393 if any( 

394 keyword in output_lower 

395 for keyword in ["error", "exception", "traceback", "failed"] 

396 ): 

397 semantic += config.error_boost 

398 

399 # Check for warnings 

400 if "warning" in output_lower or "warn" in output_lower: 

401 semantic += config.warning_boost 

402 

403 # Check if user explicitly requested this 

404 if part.state.metadata and part.state.metadata.get("user_requested"): 

405 semantic += config.user_requested_boost 

406 

407 semantic *= config.semantic_weight 

408 

409 # 4. Tool type priority 

410 tool_priority = config.tool_importance.get(part.tool, 50.0) * config.tool_priority_weight 

411 

412 return OutputImportance( 

413 recency_score=recency, 

414 size_penalty=size_penalty, 

415 semantic_score=semantic, 

416 tool_priority=tool_priority, 

417 token_count=token_count, 

418 ) 

419 

420 

421def prune_tool_outputs( 

422 messages: list[Message], 

423 config: PruningConfig, 

424) -> PruningResult: 

425 """Prune tool outputs from conversation history. 

426 

427 Selectively removes old tool outputs while preserving: 

428 - Tool call metadata (name, input, title, timing) 

429 - Recent outputs (within protection window) 

430 - Protected tool outputs 

431 - Last N turns 

432 

433 Uses exact token counting via tiktoken for accurate pruning decisions. 

434 

435 First runs duplicate detection (if enabled), then applies either 

436 importance-based or FIFO pruning strategy. 

437 

438 Args: 

439 messages: Conversation messages (newest first recommended) 

440 config: Pruning configuration 

441 

442 Returns: 

443 PruningResult with detailed token tracking and breakdown 

444 """ 

445 # Calculate token usage before pruning 

446 tokens_before = calculate_context_tokens(messages) 

447 

448 # Phase 1: Duplicate detection (runs regardless of scoring strategy) 

449 duplicate_result = PruningResult(pruned=0, tokens_saved=0) 

450 if config.detect_duplicates: 

451 duplicate_result = _detect_and_prune_duplicates(messages, config) 

452 

453 # Phase 2: Importance-based or FIFO pruning 

454 if config.use_importance_scoring: 

455 pruning_result = _prune_with_importance_scoring_only(messages, config) 

456 else: 

457 pruning_result = _prune_simple(messages, config) 

458 

459 # Calculate token usage after pruning 

460 tokens_after = calculate_context_tokens(messages) 

461 

462 # Combine results with detailed tracking 

463 all_decisions = duplicate_result.decisions + pruning_result.decisions 

464 

465 return PruningResult( 

466 pruned=duplicate_result.pruned + pruning_result.pruned, 

467 tokens_saved=duplicate_result.tokens_saved + pruning_result.tokens_saved, 

468 tokens_before=tokens_before, 

469 tokens_after=tokens_after, 

470 duplicates_pruned=duplicate_result.pruned, 

471 importance_pruned=pruning_result.pruned, 

472 duplicate_tokens_saved=duplicate_result.tokens_saved, 

473 importance_tokens_saved=pruning_result.tokens_saved, 

474 decisions=all_decisions, 

475 ) 

476 

477 

478def _prune_simple( 

479 messages: list[Message], 

480 config: PruningConfig, 

481) -> PruningResult: 

482 """Simple FIFO pruning (original algorithm). 

483 

484 Prunes oldest outputs first when token budget exceeded. 

485 """ 

486 total_tokens = 0 

487 prunable_tokens = 0 

488 to_prune: list[tuple[Message, ToolPart, int]] = [] 

489 decisions: list[PruningDecision] = [] 

490 turns_skipped = 0 

491 now_ms = int(time.time() * 1000) 

492 

493 for msg in reversed(messages): 

494 if msg.role == "user": 

495 turns_skipped += 1 

496 

497 # Check if past summary boundary 

498 past_summary = msg.summary 

499 

500 for part in msg.parts: 

501 if not isinstance(part, ToolPart): 

502 continue 

503 

504 if part.state.status != "completed": 

505 continue 

506 

507 token_count = count_tokens_fast(part.state.output) if part.state.output else 0 

508 

509 # Track protection reasons 

510 if turns_skipped < config.protect_turns: 

511 decisions.append( 

512 PruningDecision( 

513 message_id=msg.id, 

514 part_id=part.call_id, 

515 tool=part.tool, 

516 decision="protected_recent", 

517 tokens=token_count, 

518 timestamp=now_ms, 

519 metadata={"turns_back": turns_skipped}, 

520 ) 

521 ) 

522 continue 

523 

524 if past_summary: 

525 decisions.append( 

526 PruningDecision( 

527 message_id=msg.id, 

528 part_id=part.call_id, 

529 tool=part.tool, 

530 decision="protected_summary", 

531 tokens=token_count, 

532 timestamp=now_ms, 

533 ) 

534 ) 

535 break 

536 

537 if part.tool in config.protected_tools: 

538 decisions.append( 

539 PruningDecision( 

540 message_id=msg.id, 

541 part_id=part.call_id, 

542 tool=part.tool, 

543 decision="protected_tool", 

544 tokens=token_count, 

545 timestamp=now_ms, 

546 metadata={"protected_tools": config.protected_tools}, 

547 ) 

548 ) 

549 continue 

550 

551 if part.state.time and part.state.time.compacted: 

552 # Already compacted, skip (don't track as decision) 

553 continue 

554 

555 total_tokens += token_count 

556 

557 if total_tokens > config.prune_protect: 

558 prunable_tokens += token_count 

559 to_prune.append((msg, part, token_count)) 

560 else: 

561 # Kept within budget 

562 decisions.append( 

563 PruningDecision( 

564 message_id=msg.id, 

565 part_id=part.call_id, 

566 tool=part.tool, 

567 decision="kept", 

568 tokens=token_count, 

569 timestamp=now_ms, 

570 metadata={"total_tokens_so_far": total_tokens}, 

571 ) 

572 ) 

573 

574 if prunable_tokens > config.prune_minimum: 

575 # Actually prune 

576 for msg, part, tokens in to_prune: 

577 part.state.output = "" 

578 part.state.attachments = [] 

579 if part.state.time: 

580 part.state.time.compacted = int(time.time() * 1000) 

581 

582 # Record pruning decision 

583 decisions.append( 

584 PruningDecision( 

585 message_id=msg.id, 

586 part_id=part.call_id, 

587 tool=part.tool, 

588 decision="pruned_fifo", 

589 tokens=tokens, 

590 timestamp=now_ms, 

591 metadata={ 

592 "prunable_tokens": prunable_tokens, 

593 "prune_minimum": config.prune_minimum, 

594 }, 

595 ) 

596 ) 

597 

598 return PruningResult( 

599 pruned=len(to_prune), tokens_saved=prunable_tokens, decisions=decisions 

600 ) 

601 

602 # Not enough to prune - mark all candidates as kept 

603 for msg, part, tokens in to_prune: 

604 decisions.append( 

605 PruningDecision( 

606 message_id=msg.id, 

607 part_id=part.call_id, 

608 tool=part.tool, 

609 decision="kept", 

610 tokens=tokens, 

611 timestamp=now_ms, 

612 metadata={ 

613 "reason": "below_minimum_threshold", 

614 "prunable_tokens": prunable_tokens, 

615 "prune_minimum": config.prune_minimum, 

616 }, 

617 ) 

618 ) 

619 

620 return PruningResult(pruned=0, tokens_saved=0, decisions=decisions) 

621 

622 

623def _prune_with_importance_scoring_only( 

624 messages: list[Message], 

625 config: PruningConfig, 

626) -> PruningResult: 

627 """Smart pruning using importance scoring. 

628 

629 Scores all outputs by importance and prunes lowest-value first. 

630 Preserves critical outputs (errors, warnings, user-requested). 

631 

632 Note: Duplicate detection happens separately before this function is called. 

633 """ 

634 # Collect all prunable outputs with scores 

635 scored_outputs: list[tuple[Message, ToolPart, OutputImportance]] = [] 

636 decisions: list[PruningDecision] = [] 

637 turns_skipped = 0 

638 now_ms = int(time.time() * 1000) 

639 

640 for msg in reversed(messages): 

641 if msg.role == "user": 

642 turns_skipped += 1 

643 

644 past_summary = msg.summary 

645 

646 for part in msg.parts: 

647 if not isinstance(part, ToolPart): 

648 continue 

649 

650 if part.state.status != "completed": 

651 continue 

652 

653 token_count = count_tokens_fast(part.state.output) if part.state.output else 0 

654 

655 # Track protection reasons (same as FIFO) 

656 if turns_skipped < config.protect_turns: 

657 decisions.append( 

658 PruningDecision( 

659 message_id=msg.id, 

660 part_id=part.call_id, 

661 tool=part.tool, 

662 decision="protected_recent", 

663 tokens=token_count, 

664 timestamp=now_ms, 

665 metadata={"turns_back": turns_skipped}, 

666 ) 

667 ) 

668 continue 

669 

670 if past_summary: 

671 decisions.append( 

672 PruningDecision( 

673 message_id=msg.id, 

674 part_id=part.call_id, 

675 tool=part.tool, 

676 decision="protected_summary", 

677 tokens=token_count, 

678 timestamp=now_ms, 

679 ) 

680 ) 

681 break 

682 

683 if part.tool in config.protected_tools: 

684 decisions.append( 

685 PruningDecision( 

686 message_id=msg.id, 

687 part_id=part.call_id, 

688 tool=part.tool, 

689 decision="protected_tool", 

690 tokens=token_count, 

691 timestamp=now_ms, 

692 metadata={"protected_tools": config.protected_tools}, 

693 ) 

694 ) 

695 continue 

696 

697 if part.state.time and part.state.time.compacted: 

698 # Already compacted, skip 

699 continue 

700 

701 # Score this output 

702 importance = score_tool_output(part, msg, messages, config) 

703 scored_outputs.append((msg, part, importance)) 

704 

705 # Calculate total tokens 

706 total_tokens = sum(imp.token_count for _, _, imp in scored_outputs) 

707 

708 # If under budget, mark all as kept 

709 if total_tokens <= config.prune_protect: 

710 for msg, part, importance in scored_outputs: 

711 decisions.append( 

712 PruningDecision( 

713 message_id=msg.id, 

714 part_id=part.call_id, 

715 tool=part.tool, 

716 decision="kept", 

717 importance_score=importance.total_score, 

718 tokens=importance.token_count, 

719 timestamp=now_ms, 

720 metadata={"total_tokens": total_tokens, "prune_protect": config.prune_protect}, 

721 ) 

722 ) 

723 return PruningResult(pruned=0, tokens_saved=0, decisions=decisions) 

724 

725 # Sort by importance (lowest score first = prune first) 

726 scored_outputs.sort(key=lambda x: x[2].total_score) 

727 

728 # Prune until we're under budget 

729 to_prune: list[tuple[Message, ToolPart, OutputImportance]] = [] 

730 current_tokens = total_tokens 

731 

732 for msg, part, importance in scored_outputs: 

733 if current_tokens <= config.prune_protect: 

734 # Keep this one 

735 decisions.append( 

736 PruningDecision( 

737 message_id=msg.id, 

738 part_id=part.call_id, 

739 tool=part.tool, 

740 decision="kept", 

741 importance_score=importance.total_score, 

742 tokens=importance.token_count, 

743 timestamp=now_ms, 

744 metadata={ 

745 "current_tokens": current_tokens, 

746 "prune_protect": config.prune_protect, 

747 }, 

748 ) 

749 ) 

750 else: 

751 # Mark for pruning 

752 to_prune.append((msg, part, importance)) 

753 current_tokens -= importance.token_count 

754 

755 # Only prune if savings meet minimum threshold 

756 tokens_saved = sum(imp.token_count for _, _, imp in to_prune) 

757 

758 if tokens_saved >= config.prune_minimum: 

759 # Actually prune 

760 for msg, part, importance in to_prune: 

761 part.state.output = "" 

762 part.state.attachments = [] 

763 if part.state.time: 

764 part.state.time.compacted = int(time.time() * 1000) 

765 

766 # Record pruning decision with importance score 

767 decisions.append( 

768 PruningDecision( 

769 message_id=msg.id, 

770 part_id=part.call_id, 

771 tool=part.tool, 

772 decision="pruned_low_importance", 

773 importance_score=importance.total_score, 

774 tokens=importance.token_count, 

775 timestamp=now_ms, 

776 metadata={ 

777 "recency_score": importance.recency_score, 

778 "size_penalty": importance.size_penalty, 

779 "semantic_score": importance.semantic_score, 

780 "tool_priority": importance.tool_priority, 

781 "tokens_saved": tokens_saved, 

782 }, 

783 ) 

784 ) 

785 

786 return PruningResult(pruned=len(to_prune), tokens_saved=tokens_saved, decisions=decisions) 

787 

788 # Not enough savings - keep all 

789 for msg, part, importance in to_prune: 

790 decisions.append( 

791 PruningDecision( 

792 message_id=msg.id, 

793 part_id=part.call_id, 

794 tool=part.tool, 

795 decision="kept", 

796 importance_score=importance.total_score, 

797 tokens=importance.token_count, 

798 timestamp=now_ms, 

799 metadata={ 

800 "reason": "below_minimum_threshold", 

801 "tokens_saved": tokens_saved, 

802 "prune_minimum": config.prune_minimum, 

803 }, 

804 ) 

805 ) 

806 

807 return PruningResult(pruned=0, tokens_saved=0, decisions=decisions) 

808 

809 

810def _detect_and_prune_duplicates( 

811 messages: list[Message], 

812 config: PruningConfig, 

813) -> PruningResult: 

814 """Detect and prune duplicate tool outputs. 

815 

816 Uses similarity hashing (shingles + Jaccard) to find near-duplicates. 

817 Aggressively prunes older duplicates while keeping the most recent. 

818 

819 Args: 

820 messages: Conversation messages 

821 config: Pruning configuration 

822 

823 Returns: 

824 PruningResult with duplicates pruned 

825 """ 

826 # Build index of recent outputs with their shingles 

827 recent_outputs: list[tuple[ToolPart, set[str], Message]] = [] 

828 # (msg, part, tokens, duplicate_of_id) 

829 duplicates_to_prune: list[tuple[Message, ToolPart, int, str]] = [] 

830 decisions: list[PruningDecision] = [] 

831 turns_skipped = 0 

832 now_ms = int(time.time() * 1000) 

833 

834 for msg in reversed(messages): # Newest first 

835 if msg.role == "user": 

836 turns_skipped += 1 

837 

838 if turns_skipped < config.protect_turns: 

839 continue 

840 

841 if msg.summary: 

842 break 

843 

844 for part in msg.parts: 

845 if not isinstance(part, ToolPart): 

846 continue 

847 

848 if part.state.status != "completed": 

849 continue 

850 

851 if part.tool in config.protected_tools: 

852 continue 

853 

854 if part.state.time and part.state.time.compacted: 

855 continue 

856 

857 # Check if this is a duplicate 

858 if len(recent_outputs) > 0: 

859 # Only check against recent outputs (limited lookback) 

860 lookback = recent_outputs[-config.duplicate_lookback :] 

861 duplicate_of = find_duplicate_output( 

862 part, 

863 [(p, shingles) for p, shingles, _ in lookback], 

864 config.similarity_threshold, 

865 ) 

866 

867 if duplicate_of: 

868 # This is a duplicate - mark for pruning 

869 token_count = count_tokens_fast(part.state.output) 

870 duplicates_to_prune.append((msg, part, token_count, duplicate_of.call_id)) 

871 continue # Don't add to recent_outputs 

872 

873 # Not a duplicate - add to index and record decision 

874 shingles = generate_shingles(part.state.output) 

875 recent_outputs.append((part, shingles, msg)) 

876 

877 # Record as kept (unique) 

878 token_count = count_tokens_fast(part.state.output) 

879 decisions.append( 

880 PruningDecision( 

881 message_id=msg.id, 

882 part_id=part.call_id, 

883 tool=part.tool, 

884 decision="kept", 

885 tokens=token_count, 

886 timestamp=now_ms, 

887 metadata={ 

888 "reason": "unique_output", 

889 "similarity_threshold": config.similarity_threshold, 

890 }, 

891 ) 

892 ) 

893 

894 # Prune duplicates 

895 tokens_saved = 0 

896 for msg, part, token_count, duplicate_of_id in duplicates_to_prune: 

897 part.state.output = "" 

898 part.state.attachments = [] 

899 if part.state.time: 

900 part.state.time.compacted = int(time.time() * 1000) 

901 

902 tokens_saved += token_count 

903 

904 # Record pruning decision 

905 decisions.append( 

906 PruningDecision( 

907 message_id=msg.id, 

908 part_id=part.call_id, 

909 tool=part.tool, 

910 decision="pruned_duplicate", 

911 duplicate_of=duplicate_of_id, 

912 tokens=token_count, 

913 timestamp=now_ms, 

914 metadata={ 

915 "similarity_threshold": config.similarity_threshold, 

916 "lookback": config.duplicate_lookback, 

917 }, 

918 ) 

919 ) 

920 

921 return PruningResult( 

922 pruned=len(duplicates_to_prune), 

923 tokens_saved=tokens_saved, 

924 decisions=decisions, 

925 )