Coverage for src / harnessutils / compaction / pruning.py: 88%
289 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 08:30 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 08:30 -0600
1"""Tier 2: Selective pruning of tool outputs.
3Removes old tool outputs while preserving conversation structure.
4Cost: Cheap (~50ms), Latency: ~50ms.
5"""
7import hashlib
8import math
9import time
10from dataclasses import dataclass, field
11from typing import Any, Literal
13from harnessutils.config import PruningConfig
14from harnessutils.models.message import Message
15from harnessutils.models.parts import ToolPart
16from harnessutils.tokens.exact import count_tokens_fast
19@dataclass
20class PruningDecision:
21 """Individual pruning decision for a tool output.
23 Tracks why a specific output was kept or pruned, enabling full
24 audit trail and debugging of pruning behavior.
25 """
27 message_id: str # Message containing this output
28 part_id: str # Tool part ID
29 tool: str # Tool name
30 decision: Literal[
31 "kept", # Output kept in context
32 "pruned_duplicate", # Removed as duplicate
33 "pruned_low_importance", # Removed due to low importance score
34 "pruned_fifo", # Removed by FIFO (oldest first)
35 "protected_recent", # Protected by recency (protect_turns)
36 "protected_tool", # Protected by tool type (protected_tools)
37 "protected_summary", # Protected (after summary boundary)
38 ]
39 importance_score: float | None = None # Importance score (if scored)
40 duplicate_of: str | None = None # Part ID of duplicate (if duplicate)
41 tokens: int = 0 # Token count of this output
42 timestamp: int = 0 # When decision was made (unix ms)
43 metadata: dict[str, Any] = field(default_factory=dict) # Additional context
45 def to_dict(self) -> dict[str, Any]:
46 """Convert to dictionary for serialization."""
47 return {
48 "message_id": self.message_id,
49 "part_id": self.part_id,
50 "tool": self.tool,
51 "decision": self.decision,
52 "importance_score": self.importance_score,
53 "duplicate_of": self.duplicate_of,
54 "tokens": self.tokens,
55 "timestamp": self.timestamp,
56 "metadata": self.metadata,
57 }
59 @classmethod
60 def from_dict(cls, data: dict[str, Any]) -> "PruningDecision":
61 """Create from dictionary.
63 Args:
64 data: Dictionary representation
66 Returns:
67 PruningDecision instance
68 """
69 return cls(
70 message_id=data["message_id"],
71 part_id=data["part_id"],
72 tool=data["tool"],
73 decision=data["decision"],
74 importance_score=data.get("importance_score"),
75 duplicate_of=data.get("duplicate_of"),
76 tokens=data.get("tokens", 0),
77 timestamp=data.get("timestamp", 0),
78 metadata=data.get("metadata", {}),
79 )
81 @property
82 def was_pruned(self) -> bool:
83 """Check if this output was pruned."""
84 return self.decision.startswith("pruned_")
86 @property
87 def was_protected(self) -> bool:
88 """Check if this output was protected from pruning."""
89 return self.decision.startswith("protected_")
92@dataclass
93class PruningResult:
94 """Result of pruning operation with detailed token tracking."""
96 pruned: int # Total outputs pruned
97 tokens_saved: int # Total tokens saved
98 tokens_before: int = 0 # Token count before pruning
99 tokens_after: int = 0 # Token count after pruning
100 duplicates_pruned: int = 0 # Outputs pruned due to duplication
101 importance_pruned: int = 0 # Outputs pruned due to low importance
102 duplicate_tokens_saved: int = 0 # Tokens saved from deduplication
103 importance_tokens_saved: int = 0 # Tokens saved from importance pruning
104 decisions: list[PruningDecision] = field(default_factory=list) # Full decision log
106 def to_dict(self) -> dict[str, int | float]:
107 """Convert to dictionary for reporting.
109 Returns:
110 Dictionary with all pruning metrics
111 """
112 return {
113 "pruned": self.pruned,
114 "tokens_saved": self.tokens_saved,
115 "tokens_before": self.tokens_before,
116 "tokens_after": self.tokens_after,
117 "duplicates_pruned": self.duplicates_pruned,
118 "importance_pruned": self.importance_pruned,
119 "duplicate_tokens_saved": self.duplicate_tokens_saved,
120 "importance_tokens_saved": self.importance_tokens_saved,
121 "reduction_percent": round(
122 (self.tokens_saved / self.tokens_before * 100) if self.tokens_before > 0 else 0,
123 1,
124 ),
125 }
127 def __str__(self) -> str:
128 """Human-readable summary of pruning results."""
129 if self.pruned == 0:
130 return "No pruning needed"
132 lines = [
133 f"Pruned {self.pruned} outputs, saved {self.tokens_saved:,} tokens",
134 f" Before: {self.tokens_before:,} tokens",
135 f" After: {self.tokens_after:,} tokens",
136 f" Reduction: {self.to_dict()['reduction_percent']}%",
137 ]
139 if self.duplicates_pruned > 0:
140 lines.append(
141 f" Duplicates: {self.duplicates_pruned} removed "
142 f"({self.duplicate_tokens_saved:,} tokens)"
143 )
145 if self.importance_pruned > 0:
146 lines.append(
147 f" Low importance: {self.importance_pruned} removed "
148 f"({self.importance_tokens_saved:,} tokens)"
149 )
151 return "\n".join(lines)
153 def get_pruned_decisions(self) -> list[PruningDecision]:
154 """Get all decisions that resulted in pruning.
156 Returns:
157 List of decisions where output was pruned
158 """
159 return [d for d in self.decisions if d.was_pruned]
161 def get_protected_decisions(self) -> list[PruningDecision]:
162 """Get all decisions that resulted in protection.
164 Returns:
165 List of decisions where output was protected
166 """
167 return [d for d in self.decisions if d.was_protected]
169 def get_kept_decisions(self) -> list[PruningDecision]:
170 """Get all decisions where output was kept.
172 Returns:
173 List of decisions where output was kept (not pruned or protected)
174 """
175 return [d for d in self.decisions if d.decision == "kept"]
177 def get_decision_by_part(self, part_id: str) -> PruningDecision | None:
178 """Find decision for specific tool part.
180 Args:
181 part_id: Tool part ID to find
183 Returns:
184 Decision for that part, or None if not found
185 """
186 for decision in self.decisions:
187 if decision.part_id == part_id:
188 return decision
189 return None
192@dataclass
193class OutputImportance:
194 """Importance score components for a tool output."""
196 recency_score: float # Exponential decay based on age
197 size_penalty: float # Penalty for large outputs
198 semantic_score: float # Content-based importance (errors, warnings)
199 tool_priority: float # Tool type importance
200 token_count: int # Actual token count
202 @property
203 def total_score(self) -> float:
204 """Calculate weighted total importance score.
206 Higher score = more important = keep longer.
207 Lower score = less important = prune first.
208 """
209 return (
210 self.recency_score
211 + self.size_penalty
212 + self.semantic_score
213 + self.tool_priority
214 )
217def generate_shingles(text: str, n: int = 5) -> set[str]:
218 """Generate word-level n-grams (shingles) for similarity detection.
220 Args:
221 text: Text to generate shingles from
222 n: Size of n-grams (default: 5 words)
224 Returns:
225 Set of n-gram strings
226 """
227 # Normalize text
228 words = text.lower().split()
230 if len(words) < n:
231 # If text too short, use the whole thing
232 return {" ".join(words)} if words else set()
234 # Generate n-grams
235 shingles = set()
236 for i in range(len(words) - n + 1):
237 shingle = " ".join(words[i : i + n])
238 shingles.add(shingle)
240 return shingles
243def jaccard_similarity(set1: set[str], set2: set[str]) -> float:
244 """Calculate Jaccard similarity between two sets.
246 Jaccard similarity = |intersection| / |union|
247 Returns value between 0.0 (no overlap) and 1.0 (identical).
249 Args:
250 set1: First set
251 set2: Second set
253 Returns:
254 Similarity score [0.0, 1.0]
255 """
256 if not set1 or not set2:
257 return 0.0
259 intersection = len(set1 & set2)
260 union = len(set1 | set2)
262 if union == 0:
263 return 0.0
265 return intersection / union
268def compute_content_hash(text: str) -> str:
269 """Compute fast hash of text content for exact duplicate detection.
271 Args:
272 text: Text to hash
274 Returns:
275 MD5 hash hex string
276 """
277 return hashlib.md5(text.encode("utf-8")).hexdigest()
280def find_duplicate_output(
281 part: ToolPart,
282 recent_parts: list[tuple[ToolPart, set[str]]],
283 similarity_threshold: float = 0.8,
284) -> ToolPart | None:
285 """Find if output is duplicate of a recent output.
287 Args:
288 part: Tool part to check for duplication
289 recent_parts: List of (part, shingles) tuples to compare against
290 similarity_threshold: Minimum similarity to consider duplicate (default: 0.8)
292 Returns:
293 The duplicate part if found, None otherwise
294 """
295 # Only compare against same tool type
296 # Different tools are unlikely to produce identical outputs
297 # This prevents false positives across tool types
298 same_tool_parts = [
299 (p, shingles) for p, shingles in recent_parts if p.tool == part.tool
300 ]
302 if not same_tool_parts:
303 return None
305 # Fast path: exact duplicate check (within same tool type)
306 current_hash = compute_content_hash(part.state.output)
308 for recent_part, _ in same_tool_parts:
309 if compute_content_hash(recent_part.state.output) == current_hash:
310 return recent_part # Exact duplicate
312 # Generate shingles for current part
313 current_shingles = generate_shingles(part.state.output)
315 # Check similarity against recent parts of same tool
316 for recent_part, recent_shingles in same_tool_parts:
317 similarity = jaccard_similarity(current_shingles, recent_shingles)
319 if similarity >= similarity_threshold:
320 return recent_part # Found similar output
322 return None
325def calculate_context_tokens(messages: list[Message]) -> int:
326 """Calculate total token count for all tool outputs in conversation.
328 Args:
329 messages: All conversation messages
331 Returns:
332 Total token count
333 """
334 total = 0
335 for msg in messages:
336 for part in msg.parts:
337 if isinstance(part, ToolPart) and part.state.status == "completed":
338 if part.state.output: # Only count non-empty outputs
339 total += count_tokens_fast(part.state.output)
340 return total
343def calculate_turn_age(messages: list[Message], target_msg: Message) -> int:
344 """Calculate how many turns ago a message was created.
346 Args:
347 messages: All conversation messages
348 target_msg: Message to calculate age for
350 Returns:
351 Number of user turns since this message (0 = current turn)
352 """
353 turn_count = 0
354 for msg in reversed(messages):
355 if msg.id == target_msg.id:
356 return turn_count
357 if msg.role == "user":
358 turn_count += 1
359 return turn_count
362def score_tool_output(
363 part: ToolPart,
364 message: Message,
365 messages: list[Message],
366 config: PruningConfig,
367) -> OutputImportance:
368 """Calculate importance score for a tool output.
370 Args:
371 part: Tool part to score
372 message: Message containing this part
373 messages: All conversation messages
374 config: Pruning configuration with scoring weights
376 Returns:
377 OutputImportance with all score components
378 """
379 token_count = count_tokens_fast(part.state.output)
381 # 1. Recency score (exponential decay)
382 age = calculate_turn_age(messages, message)
383 recency = math.exp(-age * config.recency_decay) * 100 * config.recency_weight
385 # 2. Size penalty (prefer removing large outputs)
386 size_penalty = math.log(token_count + 1) * 10 * config.size_weight
388 # 3. Semantic importance
389 semantic = 0.0
390 output_lower = part.state.output.lower()
392 # Check for errors
393 if any(
394 keyword in output_lower
395 for keyword in ["error", "exception", "traceback", "failed"]
396 ):
397 semantic += config.error_boost
399 # Check for warnings
400 if "warning" in output_lower or "warn" in output_lower:
401 semantic += config.warning_boost
403 # Check if user explicitly requested this
404 if part.state.metadata and part.state.metadata.get("user_requested"):
405 semantic += config.user_requested_boost
407 semantic *= config.semantic_weight
409 # 4. Tool type priority
410 tool_priority = config.tool_importance.get(part.tool, 50.0) * config.tool_priority_weight
412 return OutputImportance(
413 recency_score=recency,
414 size_penalty=size_penalty,
415 semantic_score=semantic,
416 tool_priority=tool_priority,
417 token_count=token_count,
418 )
421def prune_tool_outputs(
422 messages: list[Message],
423 config: PruningConfig,
424) -> PruningResult:
425 """Prune tool outputs from conversation history.
427 Selectively removes old tool outputs while preserving:
428 - Tool call metadata (name, input, title, timing)
429 - Recent outputs (within protection window)
430 - Protected tool outputs
431 - Last N turns
433 Uses exact token counting via tiktoken for accurate pruning decisions.
435 First runs duplicate detection (if enabled), then applies either
436 importance-based or FIFO pruning strategy.
438 Args:
439 messages: Conversation messages (newest first recommended)
440 config: Pruning configuration
442 Returns:
443 PruningResult with detailed token tracking and breakdown
444 """
445 # Calculate token usage before pruning
446 tokens_before = calculate_context_tokens(messages)
448 # Phase 1: Duplicate detection (runs regardless of scoring strategy)
449 duplicate_result = PruningResult(pruned=0, tokens_saved=0)
450 if config.detect_duplicates:
451 duplicate_result = _detect_and_prune_duplicates(messages, config)
453 # Phase 2: Importance-based or FIFO pruning
454 if config.use_importance_scoring:
455 pruning_result = _prune_with_importance_scoring_only(messages, config)
456 else:
457 pruning_result = _prune_simple(messages, config)
459 # Calculate token usage after pruning
460 tokens_after = calculate_context_tokens(messages)
462 # Combine results with detailed tracking
463 all_decisions = duplicate_result.decisions + pruning_result.decisions
465 return PruningResult(
466 pruned=duplicate_result.pruned + pruning_result.pruned,
467 tokens_saved=duplicate_result.tokens_saved + pruning_result.tokens_saved,
468 tokens_before=tokens_before,
469 tokens_after=tokens_after,
470 duplicates_pruned=duplicate_result.pruned,
471 importance_pruned=pruning_result.pruned,
472 duplicate_tokens_saved=duplicate_result.tokens_saved,
473 importance_tokens_saved=pruning_result.tokens_saved,
474 decisions=all_decisions,
475 )
478def _prune_simple(
479 messages: list[Message],
480 config: PruningConfig,
481) -> PruningResult:
482 """Simple FIFO pruning (original algorithm).
484 Prunes oldest outputs first when token budget exceeded.
485 """
486 total_tokens = 0
487 prunable_tokens = 0
488 to_prune: list[tuple[Message, ToolPart, int]] = []
489 decisions: list[PruningDecision] = []
490 turns_skipped = 0
491 now_ms = int(time.time() * 1000)
493 for msg in reversed(messages):
494 if msg.role == "user":
495 turns_skipped += 1
497 # Check if past summary boundary
498 past_summary = msg.summary
500 for part in msg.parts:
501 if not isinstance(part, ToolPart):
502 continue
504 if part.state.status != "completed":
505 continue
507 token_count = count_tokens_fast(part.state.output) if part.state.output else 0
509 # Track protection reasons
510 if turns_skipped < config.protect_turns:
511 decisions.append(
512 PruningDecision(
513 message_id=msg.id,
514 part_id=part.call_id,
515 tool=part.tool,
516 decision="protected_recent",
517 tokens=token_count,
518 timestamp=now_ms,
519 metadata={"turns_back": turns_skipped},
520 )
521 )
522 continue
524 if past_summary:
525 decisions.append(
526 PruningDecision(
527 message_id=msg.id,
528 part_id=part.call_id,
529 tool=part.tool,
530 decision="protected_summary",
531 tokens=token_count,
532 timestamp=now_ms,
533 )
534 )
535 break
537 if part.tool in config.protected_tools:
538 decisions.append(
539 PruningDecision(
540 message_id=msg.id,
541 part_id=part.call_id,
542 tool=part.tool,
543 decision="protected_tool",
544 tokens=token_count,
545 timestamp=now_ms,
546 metadata={"protected_tools": config.protected_tools},
547 )
548 )
549 continue
551 if part.state.time and part.state.time.compacted:
552 # Already compacted, skip (don't track as decision)
553 continue
555 total_tokens += token_count
557 if total_tokens > config.prune_protect:
558 prunable_tokens += token_count
559 to_prune.append((msg, part, token_count))
560 else:
561 # Kept within budget
562 decisions.append(
563 PruningDecision(
564 message_id=msg.id,
565 part_id=part.call_id,
566 tool=part.tool,
567 decision="kept",
568 tokens=token_count,
569 timestamp=now_ms,
570 metadata={"total_tokens_so_far": total_tokens},
571 )
572 )
574 if prunable_tokens > config.prune_minimum:
575 # Actually prune
576 for msg, part, tokens in to_prune:
577 part.state.output = ""
578 part.state.attachments = []
579 if part.state.time:
580 part.state.time.compacted = int(time.time() * 1000)
582 # Record pruning decision
583 decisions.append(
584 PruningDecision(
585 message_id=msg.id,
586 part_id=part.call_id,
587 tool=part.tool,
588 decision="pruned_fifo",
589 tokens=tokens,
590 timestamp=now_ms,
591 metadata={
592 "prunable_tokens": prunable_tokens,
593 "prune_minimum": config.prune_minimum,
594 },
595 )
596 )
598 return PruningResult(
599 pruned=len(to_prune), tokens_saved=prunable_tokens, decisions=decisions
600 )
602 # Not enough to prune - mark all candidates as kept
603 for msg, part, tokens in to_prune:
604 decisions.append(
605 PruningDecision(
606 message_id=msg.id,
607 part_id=part.call_id,
608 tool=part.tool,
609 decision="kept",
610 tokens=tokens,
611 timestamp=now_ms,
612 metadata={
613 "reason": "below_minimum_threshold",
614 "prunable_tokens": prunable_tokens,
615 "prune_minimum": config.prune_minimum,
616 },
617 )
618 )
620 return PruningResult(pruned=0, tokens_saved=0, decisions=decisions)
623def _prune_with_importance_scoring_only(
624 messages: list[Message],
625 config: PruningConfig,
626) -> PruningResult:
627 """Smart pruning using importance scoring.
629 Scores all outputs by importance and prunes lowest-value first.
630 Preserves critical outputs (errors, warnings, user-requested).
632 Note: Duplicate detection happens separately before this function is called.
633 """
634 # Collect all prunable outputs with scores
635 scored_outputs: list[tuple[Message, ToolPart, OutputImportance]] = []
636 decisions: list[PruningDecision] = []
637 turns_skipped = 0
638 now_ms = int(time.time() * 1000)
640 for msg in reversed(messages):
641 if msg.role == "user":
642 turns_skipped += 1
644 past_summary = msg.summary
646 for part in msg.parts:
647 if not isinstance(part, ToolPart):
648 continue
650 if part.state.status != "completed":
651 continue
653 token_count = count_tokens_fast(part.state.output) if part.state.output else 0
655 # Track protection reasons (same as FIFO)
656 if turns_skipped < config.protect_turns:
657 decisions.append(
658 PruningDecision(
659 message_id=msg.id,
660 part_id=part.call_id,
661 tool=part.tool,
662 decision="protected_recent",
663 tokens=token_count,
664 timestamp=now_ms,
665 metadata={"turns_back": turns_skipped},
666 )
667 )
668 continue
670 if past_summary:
671 decisions.append(
672 PruningDecision(
673 message_id=msg.id,
674 part_id=part.call_id,
675 tool=part.tool,
676 decision="protected_summary",
677 tokens=token_count,
678 timestamp=now_ms,
679 )
680 )
681 break
683 if part.tool in config.protected_tools:
684 decisions.append(
685 PruningDecision(
686 message_id=msg.id,
687 part_id=part.call_id,
688 tool=part.tool,
689 decision="protected_tool",
690 tokens=token_count,
691 timestamp=now_ms,
692 metadata={"protected_tools": config.protected_tools},
693 )
694 )
695 continue
697 if part.state.time and part.state.time.compacted:
698 # Already compacted, skip
699 continue
701 # Score this output
702 importance = score_tool_output(part, msg, messages, config)
703 scored_outputs.append((msg, part, importance))
705 # Calculate total tokens
706 total_tokens = sum(imp.token_count for _, _, imp in scored_outputs)
708 # If under budget, mark all as kept
709 if total_tokens <= config.prune_protect:
710 for msg, part, importance in scored_outputs:
711 decisions.append(
712 PruningDecision(
713 message_id=msg.id,
714 part_id=part.call_id,
715 tool=part.tool,
716 decision="kept",
717 importance_score=importance.total_score,
718 tokens=importance.token_count,
719 timestamp=now_ms,
720 metadata={"total_tokens": total_tokens, "prune_protect": config.prune_protect},
721 )
722 )
723 return PruningResult(pruned=0, tokens_saved=0, decisions=decisions)
725 # Sort by importance (lowest score first = prune first)
726 scored_outputs.sort(key=lambda x: x[2].total_score)
728 # Prune until we're under budget
729 to_prune: list[tuple[Message, ToolPart, OutputImportance]] = []
730 current_tokens = total_tokens
732 for msg, part, importance in scored_outputs:
733 if current_tokens <= config.prune_protect:
734 # Keep this one
735 decisions.append(
736 PruningDecision(
737 message_id=msg.id,
738 part_id=part.call_id,
739 tool=part.tool,
740 decision="kept",
741 importance_score=importance.total_score,
742 tokens=importance.token_count,
743 timestamp=now_ms,
744 metadata={
745 "current_tokens": current_tokens,
746 "prune_protect": config.prune_protect,
747 },
748 )
749 )
750 else:
751 # Mark for pruning
752 to_prune.append((msg, part, importance))
753 current_tokens -= importance.token_count
755 # Only prune if savings meet minimum threshold
756 tokens_saved = sum(imp.token_count for _, _, imp in to_prune)
758 if tokens_saved >= config.prune_minimum:
759 # Actually prune
760 for msg, part, importance in to_prune:
761 part.state.output = ""
762 part.state.attachments = []
763 if part.state.time:
764 part.state.time.compacted = int(time.time() * 1000)
766 # Record pruning decision with importance score
767 decisions.append(
768 PruningDecision(
769 message_id=msg.id,
770 part_id=part.call_id,
771 tool=part.tool,
772 decision="pruned_low_importance",
773 importance_score=importance.total_score,
774 tokens=importance.token_count,
775 timestamp=now_ms,
776 metadata={
777 "recency_score": importance.recency_score,
778 "size_penalty": importance.size_penalty,
779 "semantic_score": importance.semantic_score,
780 "tool_priority": importance.tool_priority,
781 "tokens_saved": tokens_saved,
782 },
783 )
784 )
786 return PruningResult(pruned=len(to_prune), tokens_saved=tokens_saved, decisions=decisions)
788 # Not enough savings - keep all
789 for msg, part, importance in to_prune:
790 decisions.append(
791 PruningDecision(
792 message_id=msg.id,
793 part_id=part.call_id,
794 tool=part.tool,
795 decision="kept",
796 importance_score=importance.total_score,
797 tokens=importance.token_count,
798 timestamp=now_ms,
799 metadata={
800 "reason": "below_minimum_threshold",
801 "tokens_saved": tokens_saved,
802 "prune_minimum": config.prune_minimum,
803 },
804 )
805 )
807 return PruningResult(pruned=0, tokens_saved=0, decisions=decisions)
810def _detect_and_prune_duplicates(
811 messages: list[Message],
812 config: PruningConfig,
813) -> PruningResult:
814 """Detect and prune duplicate tool outputs.
816 Uses similarity hashing (shingles + Jaccard) to find near-duplicates.
817 Aggressively prunes older duplicates while keeping the most recent.
819 Args:
820 messages: Conversation messages
821 config: Pruning configuration
823 Returns:
824 PruningResult with duplicates pruned
825 """
826 # Build index of recent outputs with their shingles
827 recent_outputs: list[tuple[ToolPart, set[str], Message]] = []
828 # (msg, part, tokens, duplicate_of_id)
829 duplicates_to_prune: list[tuple[Message, ToolPart, int, str]] = []
830 decisions: list[PruningDecision] = []
831 turns_skipped = 0
832 now_ms = int(time.time() * 1000)
834 for msg in reversed(messages): # Newest first
835 if msg.role == "user":
836 turns_skipped += 1
838 if turns_skipped < config.protect_turns:
839 continue
841 if msg.summary:
842 break
844 for part in msg.parts:
845 if not isinstance(part, ToolPart):
846 continue
848 if part.state.status != "completed":
849 continue
851 if part.tool in config.protected_tools:
852 continue
854 if part.state.time and part.state.time.compacted:
855 continue
857 # Check if this is a duplicate
858 if len(recent_outputs) > 0:
859 # Only check against recent outputs (limited lookback)
860 lookback = recent_outputs[-config.duplicate_lookback :]
861 duplicate_of = find_duplicate_output(
862 part,
863 [(p, shingles) for p, shingles, _ in lookback],
864 config.similarity_threshold,
865 )
867 if duplicate_of:
868 # This is a duplicate - mark for pruning
869 token_count = count_tokens_fast(part.state.output)
870 duplicates_to_prune.append((msg, part, token_count, duplicate_of.call_id))
871 continue # Don't add to recent_outputs
873 # Not a duplicate - add to index and record decision
874 shingles = generate_shingles(part.state.output)
875 recent_outputs.append((part, shingles, msg))
877 # Record as kept (unique)
878 token_count = count_tokens_fast(part.state.output)
879 decisions.append(
880 PruningDecision(
881 message_id=msg.id,
882 part_id=part.call_id,
883 tool=part.tool,
884 decision="kept",
885 tokens=token_count,
886 timestamp=now_ms,
887 metadata={
888 "reason": "unique_output",
889 "similarity_threshold": config.similarity_threshold,
890 },
891 )
892 )
894 # Prune duplicates
895 tokens_saved = 0
896 for msg, part, token_count, duplicate_of_id in duplicates_to_prune:
897 part.state.output = ""
898 part.state.attachments = []
899 if part.state.time:
900 part.state.time.compacted = int(time.time() * 1000)
902 tokens_saved += token_count
904 # Record pruning decision
905 decisions.append(
906 PruningDecision(
907 message_id=msg.id,
908 part_id=part.call_id,
909 tool=part.tool,
910 decision="pruned_duplicate",
911 duplicate_of=duplicate_of_id,
912 tokens=token_count,
913 timestamp=now_ms,
914 metadata={
915 "similarity_threshold": config.similarity_threshold,
916 "lookback": config.duplicate_lookback,
917 },
918 )
919 )
921 return PruningResult(
922 pruned=len(duplicates_to_prune),
923 tokens_saved=tokens_saved,
924 decisions=decisions,
925 )