Coverage for src / harnessutils / manager.py: 96%
262 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 09:29 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 09:29 -0600
1"""Main ConversationManager API for harness-utils."""
3import time
4from typing import Any
6from harnessutils.compaction.pruning import PruningDecision, prune_tool_outputs
7from harnessutils.compaction.summarization import is_overflow, summarize_conversation
8from harnessutils.compaction.truncation import truncate_output
9from harnessutils.config import HarnessConfig
10from harnessutils.conversion.to_model import to_model_messages
11from harnessutils.inspection import ContextInspector
12from harnessutils.models.conversation import Conversation
13from harnessutils.models.message import Message
14from harnessutils.models.usage import Usage
15from harnessutils.snapshots import Snapshot, SnapshotDiff, SnapshotManager
16from harnessutils.storage.memory import MemoryStorage
17from harnessutils.types import LLMClient, StorageBackend
18from harnessutils.utils.ids import generate_id
21class ConversationManager:
22 """Main interface for managing conversations with context window management.
24 Provides high-level API for:
25 - Creating and managing conversations
26 - Adding messages
27 - Automatic context compaction (truncation, pruning, summarization)
28 - Message storage and retrieval
29 """
31 def __init__(
32 self,
33 storage: StorageBackend | None = None,
34 config: HarnessConfig | None = None,
35 ):
36 """Initialize conversation manager.
38 Args:
39 storage: Storage backend (uses in-memory if None)
40 config: Configuration (uses defaults if None)
41 """
42 self.config = config or HarnessConfig()
43 self.storage = storage or MemoryStorage()
44 self._message_cache: dict[str, list[Message]] = {}
45 self.snapshot_manager = SnapshotManager(storage)
47 def create_conversation(
48 self,
49 conversation_id: str | None = None,
50 project_id: str | None = None,
51 ) -> Conversation:
52 """Create a new conversation.
54 Args:
55 conversation_id: Optional conversation ID (generated if None)
56 project_id: Optional project ID for grouping
58 Returns:
59 New conversation object
60 """
61 if conversation_id is None:
62 conversation_id = generate_id("conv")
64 now = int(time.time() * 1000)
65 conversation = Conversation(
66 id=conversation_id,
67 project_id=project_id,
68 created=now,
69 updated=now,
70 )
72 self.storage.save_conversation(conversation_id, conversation.to_dict())
73 self._message_cache[conversation_id] = []
75 return conversation
77 def add_message(self, conversation_id: str, message: Message) -> None:
78 """Add a message to a conversation.
80 Args:
81 conversation_id: Conversation to add message to
82 message: Message to add
83 """
84 # Inject timestamp if not set — required for cleanup_stale_data to work
85 if "timestamp" not in message.metadata:
86 message.metadata["timestamp"] = int(time.time() * 1000)
88 self.storage.save_message(conversation_id, message.id, message.to_dict())
90 if conversation_id not in self._message_cache:
91 self._message_cache[conversation_id] = []
92 self._message_cache[conversation_id].append(message)
94 # Load conversation and update
95 conv_data = self.storage.load_conversation(conversation_id)
96 conv = Conversation.from_dict(conv_data)
97 conv.updated = int(time.time() * 1000)
99 # Track velocity if message has token count
100 if message.tokens:
101 tokens_added = message.tokens.total
102 conv.update_velocity(tokens_added)
104 self.storage.save_conversation(conversation_id, conv.to_dict())
106 def get_messages(self, conversation_id: str) -> list[Message]:
107 """Get all messages for a conversation.
109 Args:
110 conversation_id: Conversation ID
112 Returns:
113 List of messages in chronological order
114 """
115 if conversation_id in self._message_cache:
116 return self._message_cache[conversation_id]
118 message_ids = self.storage.list_messages(conversation_id)
119 messages = [
120 Message.from_dict(self.storage.load_message(conversation_id, msg_id))
121 for msg_id in message_ids
122 ]
124 self._message_cache[conversation_id] = messages
125 return messages
127 def query_messages(
128 self,
129 conversation_id: str,
130 limit: int | None = None,
131 offset: int = 0,
132 order: str = "asc",
133 after: int | None = None,
134 before: int | None = None,
135 filter: dict[str, Any] | None = None,
136 ) -> list[Message]:
137 """Query messages with filtering and pagination.
139 Args:
140 conversation_id: Conversation to query
141 limit: Maximum messages to return
142 offset: Skip first N messages
143 order: "asc" (oldest first) or "desc" (newest first)
144 after: Unix ms timestamp - messages after this time
145 before: Unix ms timestamp - messages before this time
146 filter: Filter criteria dict:
147 - has_errors (bool): Only messages with errors
148 - has_warnings (bool): Only messages with warnings
149 - min_importance (float): Minimum importance score
150 - tools (list[str]): Specific tool types
151 - roles (list[str]): Message roles (user/assistant)
152 - has_tool_outputs (bool): Only messages with tool outputs
153 - is_summary (bool): Only summary messages
155 Returns:
156 Filtered and paginated messages
158 Examples:
159 # Get last 10 messages
160 messages = manager.query_messages(conv_id, limit=10, order="desc")
162 # Get messages with errors
163 messages = manager.query_messages(
164 conv_id,
165 filter={"has_errors": True}
166 )
168 # Get recent high-importance messages
169 messages = manager.query_messages(
170 conv_id,
171 filter={"min_importance": 50.0},
172 after=timestamp_24h_ago,
173 limit=20
174 )
175 """
176 from harnessutils.query import MessageFilter, QueryOptions, query_messages
178 # Build filter
179 msg_filter = None
180 if filter is not None:
181 msg_filter = MessageFilter(
182 has_errors=filter.get("has_errors"),
183 has_warnings=filter.get("has_warnings"),
184 min_importance=filter.get("min_importance"),
185 tools=filter.get("tools"),
186 roles=filter.get("roles"),
187 has_tool_outputs=filter.get("has_tool_outputs"),
188 is_summary=filter.get("is_summary"),
189 )
191 # Build query options
192 options = QueryOptions(
193 limit=limit,
194 offset=offset,
195 order=order, # type: ignore
196 after=after,
197 before=before,
198 filter=msg_filter,
199 )
201 # Get all messages and query
202 messages = self.get_messages(conversation_id)
203 return query_messages(messages, options)
205 def get_context_summary(
206 self, conversation_id: str, recent_limit: int = 5
207 ) -> dict[str, Any]:
208 """Get lightweight context summary without loading full content.
210 Provides overview of conversation state including:
211 - Message count and token usage
212 - Summary messages
213 - Recent activity (last N messages)
214 - Key messages (high importance)
215 - Error messages
217 Args:
218 conversation_id: Conversation to summarize
219 recent_limit: Number of recent messages to include (default: 5)
221 Returns:
222 Dictionary with context summary:
223 - conversation_id (str)
224 - message_count (int)
225 - total_tokens (int)
226 - summaries (list): Summary messages
227 - recent_activity (list): Last N message summaries
228 - key_messages (list): High-importance messages
229 - errors (list): Error messages
231 Examples:
232 # Get quick overview
233 summary = manager.get_context_summary(conv_id)
234 print(f"Messages: {summary['message_count']}")
235 print(f"Tokens: {summary['total_tokens']}")
236 print(f"Errors: {len(summary['errors'])}")
237 """
238 from harnessutils.query import build_context_summary
240 messages = self.get_messages(conversation_id)
241 summary = build_context_summary(conversation_id, messages, recent_limit)
242 return summary.to_dict()
244 def inspect_context(self, conversation_id: str) -> ContextInspector:
245 """Create inspector for querying context state.
247 Provides observability into:
248 - What's currently in context
249 - Token counts and breakdowns
250 - Pruning predictions
251 - Audit trail of decisions
253 Args:
254 conversation_id: Conversation to inspect
256 Returns:
257 ContextInspector instance with full query capabilities
259 Example:
260 >>> inspector = manager.inspect_context(conv_id)
261 >>> summary = inspector.summary()
262 >>> print(f"Total tokens: {summary['total_tokens']}")
263 >>> impact = inspector.predict_impact(5000)
264 >>> if impact['would_trigger_pruning']:
265 ... print(f"Would prune ~{impact['estimated_pruned_count']} outputs")
266 """
267 messages = self.get_messages(conversation_id)
269 # Load conversation metadata
270 conv_data = self.storage.load_conversation(conversation_id)
271 conversation = Conversation.from_dict(conv_data)
273 # Load persisted pruning decisions for audit trail
274 decisions_data = conversation.metadata.get("latest_pruning_decisions", [])
275 decisions = [PruningDecision.from_dict(d) for d in decisions_data]
277 return ContextInspector(messages, self.config, conversation, decisions)
279 def prune_before_turn(
280 self,
281 conversation_id: str,
282 auto_mode: bool = False,
283 ) -> dict[str, Any]:
284 """Proactively prune old tool outputs before processing a turn.
286 This is Tier 2 compaction - removes old tool outputs while
287 preserving conversation structure.
289 Args:
290 conversation_id: Conversation to prune
291 auto_mode: Whether this was auto-triggered
293 Returns:
294 Detailed pruning result with token tracking and breakdown:
295 - pruned: Total outputs removed
296 - tokens_saved: Total tokens saved
297 - tokens_before: Token count before pruning
298 - tokens_after: Token count after pruning
299 - duplicates_pruned: Outputs removed due to duplication
300 - importance_pruned: Outputs removed due to low importance
301 - duplicate_tokens_saved: Tokens saved from deduplication
302 - importance_tokens_saved: Tokens saved from importance pruning
303 - reduction_percent: Percentage reduction in token usage
304 """
305 if not self.config.compaction.prune and auto_mode:
306 return {
307 "pruned": 0,
308 "tokens_saved": 0,
309 "tokens_before": 0,
310 "tokens_after": 0,
311 "duplicates_pruned": 0,
312 "importance_pruned": 0,
313 "duplicate_tokens_saved": 0,
314 "importance_tokens_saved": 0,
315 "reduction_percent": 0,
316 }
318 messages = self.get_messages(conversation_id)
319 result = prune_tool_outputs(
320 messages,
321 self.config.pruning,
322 )
324 for msg in messages:
325 self.storage.save_message(conversation_id, msg.id, msg.to_dict())
327 # Invalidate cache — pruning mutated messages in-place
328 if conversation_id in self._message_cache:
329 del self._message_cache[conversation_id]
331 # Persist decisions so audit trail and quality metrics work
332 conv_data = self.storage.load_conversation(conversation_id)
333 conv = Conversation.from_dict(conv_data)
334 conv.metadata["latest_pruning_decisions"] = [d.to_dict() for d in result.decisions]
335 self.storage.save_conversation(conversation_id, conv.to_dict())
337 return result.to_dict()
339 def predict_overflow(
340 self,
341 conversation_id: str,
342 current_usage: Usage,
343 ) -> bool:
344 """Predict if conversation will overflow in next N turns.
346 Args:
347 conversation_id: Conversation to check
348 current_usage: Current token usage
350 Returns:
351 True if overflow predicted within lookahead window
352 """
353 if not self.config.compaction.use_predictive:
354 return False
356 # Load conversation and get velocity
357 conv_data = self.storage.load_conversation(conversation_id)
358 conv = Conversation.from_dict(conv_data)
359 velocity = conv.get_velocity()
361 if velocity is None or not velocity.turn_deltas:
362 return False # No velocity data yet
364 # Project tokens ahead
365 lookahead = self.config.compaction.predictive_lookahead
366 predicted_growth = velocity.predict_tokens_ahead(lookahead)
368 # Calculate current total and projected total
369 current_total = current_usage.input + current_usage.cache.read
370 projected_total = current_total + predicted_growth
372 # Check against safety margin
373 usable_space = (
374 self.config.model_limits.default_context_limit
375 - self.config.model_limits.default_output_limit
376 )
377 safety_threshold = usable_space * self.config.compaction.predictive_safety_margin
379 return projected_total > safety_threshold
381 def needs_compaction(
382 self,
383 conversation_id: str,
384 usage: Usage,
385 ) -> bool:
386 """Check if conversation needs summarization (Tier 3).
388 Uses both reactive (overflow) and predictive checks.
390 Args:
391 conversation_id: Conversation to check
392 usage: Token usage from last turn
394 Returns:
395 True if summarization needed
396 """
397 # Reactive check: already overflowed
398 if is_overflow(
399 usage,
400 self.config.model_limits.default_context_limit,
401 self.config.model_limits.default_output_limit,
402 ):
403 return True
405 # Predictive check: will overflow soon
406 return self.predict_overflow(conversation_id, usage)
408 def compact(
409 self,
410 conversation_id: str,
411 llm_client: LLMClient,
412 parent_message_id: str,
413 model: str | None = None,
414 auto_mode: bool = False,
415 ) -> dict[str, Any]:
416 """Compact conversation using LLM summarization (Tier 3).
418 Args:
419 conversation_id: Conversation to compact
420 llm_client: LLM client for summarization
421 parent_message_id: Message that triggered compaction
422 model: Optional model to use for summarization
423 auto_mode: Whether this was auto-triggered
425 Returns:
426 Compaction result with summary message and metrics
427 """
428 if not self.config.compaction.auto and auto_mode:
429 return {"summarized": False}
431 messages = self.get_messages(conversation_id)
432 summary_id = generate_id("msg")
434 result = summarize_conversation(
435 messages=messages,
436 llm_client=llm_client,
437 parent_message_id=parent_message_id,
438 message_id=summary_id,
439 model=model,
440 auto_mode=auto_mode,
441 config=self.config.summarization,
442 )
444 self.add_message(conversation_id, result.summary_message)
446 return {
447 "summarized": True,
448 "summary_message_id": summary_id,
449 "tokens_used": result.tokens_used.total,
450 "cost": result.cost,
451 }
453 def to_model_format(self, conversation_id: str) -> list[dict[str, Any]]:
454 """Convert conversation messages to model format for LLM requests.
456 Args:
457 conversation_id: Conversation to convert
459 Returns:
460 List of messages in model format
461 """
462 messages = self.get_messages(conversation_id)
463 return to_model_messages(messages)
465 def calculate_context_usage(
466 self,
467 conversation_id: str,
468 model: str | None = None,
469 ) -> int:
470 """Calculate exact token count for conversation using tiktoken.
472 This counts ALL tokens in the conversation (user messages, assistant
473 responses, tool outputs, etc.) that will be sent to the model.
475 Uses cl100k_base tokenizer (GPT-4/Claude) which works for most modern LLMs.
477 Args:
478 conversation_id: Conversation to calculate usage for
479 model: Optional model name (currently unused, defaults to cl100k_base)
481 Returns:
482 Exact token count that will be used in context window
483 """
484 from harnessutils.tokens.exact import count_tokens_exact
486 messages = self.to_model_format(conversation_id)
487 return count_tokens_exact(messages, model)
489 def get_tool_output_tokens(self, conversation_id: str) -> dict[str, Any]:
490 """Get detailed breakdown of token usage for tool outputs.
492 Args:
493 conversation_id: Conversation to analyze
495 Returns:
496 Dictionary with token breakdown:
497 - total: Total tokens in tool outputs
498 - by_tool: Token count per tool type
499 - prunable: Tokens that could be pruned
500 - protected: Tokens in protected outputs
501 """
502 from harnessutils.compaction.pruning import calculate_context_tokens
504 messages = self.get_messages(conversation_id)
506 total = calculate_context_tokens(messages)
507 by_tool: dict[str, int] = {}
508 prunable = 0
509 protected = 0
510 turns_skipped = 0
512 for msg in reversed(messages):
513 if msg.role == "user":
514 turns_skipped += 1
516 for part in msg.parts:
517 from harnessutils.models.parts import ToolPart
518 from harnessutils.tokens.exact import count_tokens_fast
520 if not isinstance(part, ToolPart):
521 continue
523 if part.state.status != "completed":
524 continue
526 if not part.state.output:
527 continue
529 tokens = count_tokens_fast(part.state.output)
531 # Track by tool type
532 by_tool[part.tool] = by_tool.get(part.tool, 0) + tokens
534 # Determine if prunable
535 is_protected = (
536 turns_skipped < self.config.pruning.protect_turns
537 or part.tool in self.config.pruning.protected_tools
538 or (part.state.time and part.state.time.compacted)
539 )
541 if is_protected:
542 protected += tokens
543 else:
544 prunable += tokens
546 return {
547 "total": total,
548 "by_tool": by_tool,
549 "prunable": prunable,
550 "protected": protected,
551 "prunability_percent": round((prunable / total * 100) if total > 0 else 0, 1),
552 }
554 def get_context_quality(self, conversation_id: str) -> dict[str, Any]:
555 """Get current quality metrics for conversation.
557 Calculates all quality metrics, updates history, and returns assessment
558 with health status and actionable recommendations.
560 Args:
561 conversation_id: Conversation to analyze
563 Returns:
564 Dictionary with all metrics, health status, and recommendations.
565 Keys: information_density, redundancy_ratio, staleness_score,
566 error_preservation_rate, protected_ratio, health, recommendations
567 """
568 from harnessutils.quality import assess_quality
570 conv_data = self.storage.load_conversation(conversation_id)
571 conv = Conversation.from_dict(conv_data)
572 messages = self.get_messages(conversation_id)
574 # Get pruning decisions from metadata if available
575 decisions = conv.metadata.get("latest_pruning_decisions")
577 snapshot = assess_quality(
578 messages=messages,
579 config=self.config.pruning,
580 decisions=decisions,
581 )
583 # Update history
584 conv.update_quality_history(snapshot)
585 self.storage.save_conversation(conversation_id, conv.to_dict())
587 return snapshot.to_dict()
589 def track_quality_metric(
590 self,
591 conversation_id: str,
592 metric_name: str,
593 value: float,
594 timestamp: int | None = None,
595 ) -> None:
596 """Track a single quality metric value.
598 Note: Prefer get_context_quality() which calculates all metrics.
599 This is for custom/external metrics.
601 Args:
602 conversation_id: Conversation to track
603 metric_name: Name of metric (e.g., "information_density")
604 value: Metric value
605 timestamp: Unix ms timestamp (defaults to now)
606 """
607 from harnessutils.quality import QualitySnapshot
609 if timestamp is None:
610 timestamp = int(time.time() * 1000)
612 conv_data = self.storage.load_conversation(conversation_id)
613 conv = Conversation.from_dict(conv_data)
615 # Create minimal snapshot with just this metric
616 # (Other metrics set to 0.0, empty recommendations)
617 snapshot = QualitySnapshot(
618 timestamp=timestamp,
619 information_density=value if metric_name == "information_density" else 0.0,
620 redundancy_ratio=value if metric_name == "redundancy_ratio" else 0.0,
621 staleness_score=value if metric_name == "staleness_score" else 0.0,
622 error_preservation_rate=value
623 if metric_name == "error_preservation_rate"
624 else 0.0,
625 protected_ratio=value if metric_name == "protected_ratio" else 0.0,
626 health="unknown",
627 recommendations=[],
628 )
630 conv.update_quality_history(snapshot)
631 self.storage.save_conversation(conversation_id, conv.to_dict())
633 def get_quality_trend(
634 self,
635 conversation_id: str,
636 metric: str,
637 window: int = 20,
638 ) -> list[tuple[int, float]]:
639 """Get trend data for a specific metric.
641 Args:
642 conversation_id: Conversation to query
643 metric: Metric name (e.g., "information_density")
644 window: Number of most recent snapshots to return
646 Returns:
647 List of (timestamp, value) tuples, most recent first
648 """
649 conv_data = self.storage.load_conversation(conversation_id)
650 conv = Conversation.from_dict(conv_data)
652 history = conv.get_quality_history()
653 if history is None:
654 return []
656 return history.get_trend(metric, window)
658 def truncate_tool_output(
659 self,
660 output: str,
661 tool_name: str,
662 ) -> str:
663 """Truncate tool output if it exceeds limits (Tier 1).
665 Args:
666 output: Tool output to truncate
667 tool_name: Name of the tool
669 Returns:
670 Potentially truncated output
671 """
672 output_id = generate_id(f"output_{tool_name}")
674 result = truncate_output(
675 output=output,
676 config=self.config.truncation,
677 output_id=output_id,
678 )
680 if result.truncated and result.output_path:
681 self.storage.save_truncated_output(result.output_path, output)
683 return result.content
685 def create_snapshot(
686 self,
687 conversation_id: str,
688 snapshot_id: str | None = None,
689 metadata: dict[str, Any] | None = None,
690 ) -> Snapshot:
691 """Create snapshot of conversation state for reproducibility.
693 Captures full conversation state including:
694 - All messages with content
695 - Conversation metadata (velocity, etc.)
696 - Current configuration
698 Useful for:
699 - Debugging (save state before/after changes)
700 - A/B testing (compare different strategies)
701 - Reproducibility (restore exact state)
703 Args:
704 conversation_id: Conversation to snapshot
705 snapshot_id: Optional snapshot ID (auto-generated if None)
706 metadata: Optional metadata (e.g., {"reason": "before_refactor"})
708 Returns:
709 Created snapshot
711 Example:
712 >>> snap = manager.create_snapshot(conv_id, metadata={"test": "baseline"})
713 >>> # Make changes...
714 >>> snap2 = manager.create_snapshot(conv_id, metadata={"test": "optimized"})
715 >>> diff = manager.compare_snapshots(snap.snapshot_id, snap2.snapshot_id)
716 """
717 messages = self.get_messages(conversation_id)
719 conv_data = self.storage.load_conversation(conversation_id)
720 conversation = Conversation.from_dict(conv_data)
722 # Serialize config to dict recursively (convert dataclasses, Paths, etc.)
723 from dataclasses import asdict, is_dataclass
724 from pathlib import Path
726 def _serialize_config(obj: Any) -> Any:
727 """Recursively convert config to JSON-safe dict."""
728 if isinstance(obj, Path):
729 return str(obj)
730 elif is_dataclass(obj) and not isinstance(obj, type):
731 # Convert dataclass instance to dict, then recursively serialize values
732 obj_dict = asdict(obj)
733 return {k: _serialize_config(v) for k, v in obj_dict.items()}
734 elif isinstance(obj, dict):
735 return {k: _serialize_config(v) for k, v in obj.items()}
736 elif isinstance(obj, (list, tuple)):
737 return [_serialize_config(item) for item in obj]
738 else:
739 return obj
741 config_dict = _serialize_config(self.config)
743 return self.snapshot_manager.create_snapshot(
744 conversation_id=conversation_id,
745 messages=messages,
746 conversation=conversation,
747 config=config_dict,
748 snapshot_id=snapshot_id,
749 metadata=metadata,
750 )
752 def restore_snapshot(self, snapshot_id: str) -> str:
753 """Restore conversation from snapshot.
755 WARNING: This replaces current conversation state with snapshot state.
756 Consider creating a snapshot of current state first.
758 Args:
759 snapshot_id: Snapshot to restore
761 Returns:
762 Conversation ID of restored conversation
764 Raises:
765 FileNotFoundError: If snapshot not found
766 """
767 snapshot = self.snapshot_manager.get_snapshot(snapshot_id)
768 if not snapshot:
769 raise FileNotFoundError(f"Snapshot {snapshot_id} not found")
771 messages, conversation = self.snapshot_manager.restore_snapshot(snapshot)
773 # Clear existing messages (replace entire conversation state)
774 # For memory storage, clear the messages dict for this conversation
775 if hasattr(self.storage, "messages"):
776 if snapshot.conversation_id in self.storage.messages:
777 self.storage.messages[snapshot.conversation_id] = {}
779 # Save conversation
780 self.storage.save_conversation(snapshot.conversation_id, conversation.to_dict())
782 # Save messages
783 for msg in messages:
784 self.storage.save_message(snapshot.conversation_id, msg.id, msg.to_dict())
786 # Clear cache
787 if snapshot.conversation_id in self._message_cache:
788 del self._message_cache[snapshot.conversation_id]
790 return snapshot.conversation_id
792 def compare_snapshots(
793 self, snapshot1_id: str, snapshot2_id: str
794 ) -> SnapshotDiff | None:
795 """Compare two snapshots to see what changed.
797 Args:
798 snapshot1_id: First snapshot ID (earlier)
799 snapshot2_id: Second snapshot ID (later)
801 Returns:
802 SnapshotDiff describing changes:
803 - messages_added: Number of messages added
804 - messages_removed: Number of messages removed
805 - tokens_delta: Change in token count
806 - message_changes: List of specific changes
807 - config_changes: Configuration modifications
808 - metadata_changes: Additional metadata
810 None if snapshots not found
811 """
812 return self.snapshot_manager.compare_snapshots(snapshot1_id, snapshot2_id)
814 def export_snapshot(self, snapshot_id: str, file_path: str) -> None:
815 """Export snapshot to JSON file for version control.
817 Args:
818 snapshot_id: Snapshot to export
819 file_path: Path to write JSON file
821 Raises:
822 FileNotFoundError: If snapshot not found
824 Example:
825 >>> snap = manager.create_snapshot(conv_id)
826 >>> manager.export_snapshot(snap.snapshot_id, "snapshots/baseline.json")
827 >>> # Commit to git for reproducibility
828 """
829 self.snapshot_manager.export_snapshot(snapshot_id, file_path)
831 def import_snapshot(self, file_path: str) -> Snapshot:
832 """Import snapshot from JSON file.
834 Args:
835 file_path: Path to JSON file
837 Returns:
838 Imported snapshot
839 """
840 return self.snapshot_manager.import_snapshot(file_path)
842 def cleanup_stale_data(
843 self,
844 conversation_id: str,
845 max_age_hours: float = 24,
846 keep_errors: bool = True,
847 execute: bool = False,
848 ) -> dict[str, Any]:
849 """Clean up stale conversation data.
851 By default, identifies old messages and tool outputs for cleanup based on age
852 without making any changes. Set execute=True to actually clear stale outputs.
854 Args:
855 conversation_id: Conversation to clean up
856 max_age_hours: Maximum age in hours before considering stale (default: 24)
857 keep_errors: Whether to preserve error messages (default: True)
858 execute: When True, actually clears stale outputs (default: False)
860 Returns:
861 Dictionary with cleanup statistics:
862 - messages_archived (int): Messages eligible for archival
863 - tokens_freed (int): Tokens that would be freed
864 - duplicates_removed (int): Duplicate outputs found
865 - stale_outputs_pruned (int): Stale outputs identified/cleared
866 - operations (list[str]): Description of operations
868 Example:
869 >>> result = manager.cleanup_stale_data(conv_id, max_age_hours=48)
870 >>> print(f"Can free {result['tokens_freed']} tokens")
871 >>> result = manager.cleanup_stale_data(conv_id, max_age_hours=48, execute=True)
872 >>> print(f"Freed {result['tokens_freed']} tokens")
873 """
874 from harnessutils.maintenance import cleanup_stale_data
876 messages = self.get_messages(conversation_id)
877 result = cleanup_stale_data(
878 messages=messages,
879 config=self.config.pruning,
880 max_age_hours=max_age_hours,
881 keep_errors=keep_errors,
882 execute=execute,
883 )
885 if execute:
886 for msg in messages:
887 self.storage.save_message(conversation_id, msg.id, msg.to_dict())
888 # Invalidate cache — outputs were cleared in-place
889 if conversation_id in self._message_cache:
890 del self._message_cache[conversation_id]
892 return result.to_dict()
894 def scan_and_deduplicate(self, conversation_id: str) -> dict[str, Any]:
895 """Scan for duplicate outputs and return statistics.
897 Identifies duplicate tool outputs without removing them.
898 Use prune_before_turn() to actually remove duplicates.
900 Args:
901 conversation_id: Conversation to scan
903 Returns:
904 Dictionary with deduplication statistics:
905 - duplicates_removed (int): Number of duplicates found
906 - tokens_freed (int): Tokens that would be freed
907 - operations (list[str]): Description of findings
909 Example:
910 >>> result = manager.scan_and_deduplicate(conv_id)
911 >>> if result['duplicates_removed'] > 0:
912 ... print(f"Found {result['duplicates_removed']} duplicates")
913 ... # Run compaction to actually remove them
914 ... manager.prune_before_turn(conv_id)
915 """
916 from harnessutils.maintenance import scan_and_deduplicate
918 messages = self.get_messages(conversation_id)
919 result = scan_and_deduplicate(
920 messages=messages,
921 config=self.config.pruning,
922 )
924 return result.to_dict()
926 def get_memory(self, project_id: str, key: str, default: Any = None) -> Any:
927 """Get a value from project-scoped memory.
929 Args:
930 project_id: Project to retrieve memory from
931 key: Memory key
932 default: Default value if key not found
934 Returns:
935 Stored value or default
936 """
937 data = self._load_project_memory(project_id)
938 return data.get(key, default)
940 def set_memory(self, project_id: str, key: str, value: Any) -> None:
941 """Set a value in project-scoped memory.
943 Args:
944 project_id: Project to store memory in
945 key: Memory key
946 value: Value to store
947 """
948 data = self._load_project_memory(project_id)
949 data[key] = value
950 try:
951 self.storage.save_project_memory(project_id, data)
952 except AttributeError:
953 pass # Backend doesn't support project memory
955 def delete_memory(self, project_id: str, key: str) -> None:
956 """Delete a key from project-scoped memory.
958 Args:
959 project_id: Project to delete memory from
960 key: Memory key to delete
961 """
962 data = self._load_project_memory(project_id)
963 data.pop(key, None)
964 try:
965 self.storage.save_project_memory(project_id, data)
966 except AttributeError:
967 pass # Backend doesn't support project memory
969 def list_memory(self, project_id: str) -> dict[str, Any]:
970 """List all memory keys and values for a project.
972 Args:
973 project_id: Project to list memory for
975 Returns:
976 Dictionary of all memory key-value pairs
977 """
978 return self._load_project_memory(project_id)
980 def _load_project_memory(self, project_id: str) -> dict[str, Any]:
981 """Load project memory, returning empty dict on failure.
983 Args:
984 project_id: Project to load memory for
986 Returns:
987 Memory dictionary (empty if not found or backend unsupported)
988 """
989 try:
990 return self.storage.load_project_memory(project_id)
991 except (FileNotFoundError, AttributeError):
992 return {}
994 def detect_context_issues(self, conversation_id: str) -> list[dict[str, Any]]:
995 """Detect quality and drift issues in conversation context.
997 Analyzes conversation for common problems:
998 - High redundancy (duplicate content)
999 - Staleness accumulation (old messages)
1000 - Low information density
1001 - Error preservation status
1002 - Excessive protection (limiting pruning)
1004 Args:
1005 conversation_id: Conversation to analyze
1007 Returns:
1008 List of issue dictionaries, each containing:
1009 - issue_type (str): Type of issue
1010 - severity (str): "info", "warning", or "error"
1011 - description (str): Human-readable description
1012 - affected_count (int): Number of items affected
1013 - suggested_fix (str): Recommended action
1014 - metadata (dict): Additional details
1016 Example:
1017 >>> issues = manager.detect_context_issues(conv_id)
1018 >>> for issue in issues:
1019 ... if issue['severity'] == 'warning':
1020 ... print(f"⚠️ {issue['description']}")
1021 ... print(f" Fix: {issue['suggested_fix']}")
1022 """
1023 from harnessutils.maintenance import detect_context_issues
1025 conv_data = self.storage.load_conversation(conversation_id)
1026 conv = Conversation.from_dict(conv_data)
1027 messages = self.get_messages(conversation_id)
1029 issues = detect_context_issues(
1030 messages=messages,
1031 conversation=conv,
1032 config=self.config.pruning,
1033 )
1035 return [issue.to_dict() for issue in issues]