Coverage for src / harnessutils / manager.py: 96%

262 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 09:29 -0600

1"""Main ConversationManager API for harness-utils.""" 

2 

3import time 

4from typing import Any 

5 

6from harnessutils.compaction.pruning import PruningDecision, prune_tool_outputs 

7from harnessutils.compaction.summarization import is_overflow, summarize_conversation 

8from harnessutils.compaction.truncation import truncate_output 

9from harnessutils.config import HarnessConfig 

10from harnessutils.conversion.to_model import to_model_messages 

11from harnessutils.inspection import ContextInspector 

12from harnessutils.models.conversation import Conversation 

13from harnessutils.models.message import Message 

14from harnessutils.models.usage import Usage 

15from harnessutils.snapshots import Snapshot, SnapshotDiff, SnapshotManager 

16from harnessutils.storage.memory import MemoryStorage 

17from harnessutils.types import LLMClient, StorageBackend 

18from harnessutils.utils.ids import generate_id 

19 

20 

21class ConversationManager: 

22 """Main interface for managing conversations with context window management. 

23 

24 Provides high-level API for: 

25 - Creating and managing conversations 

26 - Adding messages 

27 - Automatic context compaction (truncation, pruning, summarization) 

28 - Message storage and retrieval 

29 """ 

30 

31 def __init__( 

32 self, 

33 storage: StorageBackend | None = None, 

34 config: HarnessConfig | None = None, 

35 ): 

36 """Initialize conversation manager. 

37 

38 Args: 

39 storage: Storage backend (uses in-memory if None) 

40 config: Configuration (uses defaults if None) 

41 """ 

42 self.config = config or HarnessConfig() 

43 self.storage = storage or MemoryStorage() 

44 self._message_cache: dict[str, list[Message]] = {} 

45 self.snapshot_manager = SnapshotManager(storage) 

46 

47 def create_conversation( 

48 self, 

49 conversation_id: str | None = None, 

50 project_id: str | None = None, 

51 ) -> Conversation: 

52 """Create a new conversation. 

53 

54 Args: 

55 conversation_id: Optional conversation ID (generated if None) 

56 project_id: Optional project ID for grouping 

57 

58 Returns: 

59 New conversation object 

60 """ 

61 if conversation_id is None: 

62 conversation_id = generate_id("conv") 

63 

64 now = int(time.time() * 1000) 

65 conversation = Conversation( 

66 id=conversation_id, 

67 project_id=project_id, 

68 created=now, 

69 updated=now, 

70 ) 

71 

72 self.storage.save_conversation(conversation_id, conversation.to_dict()) 

73 self._message_cache[conversation_id] = [] 

74 

75 return conversation 

76 

77 def add_message(self, conversation_id: str, message: Message) -> None: 

78 """Add a message to a conversation. 

79 

80 Args: 

81 conversation_id: Conversation to add message to 

82 message: Message to add 

83 """ 

84 # Inject timestamp if not set — required for cleanup_stale_data to work 

85 if "timestamp" not in message.metadata: 

86 message.metadata["timestamp"] = int(time.time() * 1000) 

87 

88 self.storage.save_message(conversation_id, message.id, message.to_dict()) 

89 

90 if conversation_id not in self._message_cache: 

91 self._message_cache[conversation_id] = [] 

92 self._message_cache[conversation_id].append(message) 

93 

94 # Load conversation and update 

95 conv_data = self.storage.load_conversation(conversation_id) 

96 conv = Conversation.from_dict(conv_data) 

97 conv.updated = int(time.time() * 1000) 

98 

99 # Track velocity if message has token count 

100 if message.tokens: 

101 tokens_added = message.tokens.total 

102 conv.update_velocity(tokens_added) 

103 

104 self.storage.save_conversation(conversation_id, conv.to_dict()) 

105 

106 def get_messages(self, conversation_id: str) -> list[Message]: 

107 """Get all messages for a conversation. 

108 

109 Args: 

110 conversation_id: Conversation ID 

111 

112 Returns: 

113 List of messages in chronological order 

114 """ 

115 if conversation_id in self._message_cache: 

116 return self._message_cache[conversation_id] 

117 

118 message_ids = self.storage.list_messages(conversation_id) 

119 messages = [ 

120 Message.from_dict(self.storage.load_message(conversation_id, msg_id)) 

121 for msg_id in message_ids 

122 ] 

123 

124 self._message_cache[conversation_id] = messages 

125 return messages 

126 

127 def query_messages( 

128 self, 

129 conversation_id: str, 

130 limit: int | None = None, 

131 offset: int = 0, 

132 order: str = "asc", 

133 after: int | None = None, 

134 before: int | None = None, 

135 filter: dict[str, Any] | None = None, 

136 ) -> list[Message]: 

137 """Query messages with filtering and pagination. 

138 

139 Args: 

140 conversation_id: Conversation to query 

141 limit: Maximum messages to return 

142 offset: Skip first N messages 

143 order: "asc" (oldest first) or "desc" (newest first) 

144 after: Unix ms timestamp - messages after this time 

145 before: Unix ms timestamp - messages before this time 

146 filter: Filter criteria dict: 

147 - has_errors (bool): Only messages with errors 

148 - has_warnings (bool): Only messages with warnings 

149 - min_importance (float): Minimum importance score 

150 - tools (list[str]): Specific tool types 

151 - roles (list[str]): Message roles (user/assistant) 

152 - has_tool_outputs (bool): Only messages with tool outputs 

153 - is_summary (bool): Only summary messages 

154 

155 Returns: 

156 Filtered and paginated messages 

157 

158 Examples: 

159 # Get last 10 messages 

160 messages = manager.query_messages(conv_id, limit=10, order="desc") 

161 

162 # Get messages with errors 

163 messages = manager.query_messages( 

164 conv_id, 

165 filter={"has_errors": True} 

166 ) 

167 

168 # Get recent high-importance messages 

169 messages = manager.query_messages( 

170 conv_id, 

171 filter={"min_importance": 50.0}, 

172 after=timestamp_24h_ago, 

173 limit=20 

174 ) 

175 """ 

176 from harnessutils.query import MessageFilter, QueryOptions, query_messages 

177 

178 # Build filter 

179 msg_filter = None 

180 if filter is not None: 

181 msg_filter = MessageFilter( 

182 has_errors=filter.get("has_errors"), 

183 has_warnings=filter.get("has_warnings"), 

184 min_importance=filter.get("min_importance"), 

185 tools=filter.get("tools"), 

186 roles=filter.get("roles"), 

187 has_tool_outputs=filter.get("has_tool_outputs"), 

188 is_summary=filter.get("is_summary"), 

189 ) 

190 

191 # Build query options 

192 options = QueryOptions( 

193 limit=limit, 

194 offset=offset, 

195 order=order, # type: ignore 

196 after=after, 

197 before=before, 

198 filter=msg_filter, 

199 ) 

200 

201 # Get all messages and query 

202 messages = self.get_messages(conversation_id) 

203 return query_messages(messages, options) 

204 

205 def get_context_summary( 

206 self, conversation_id: str, recent_limit: int = 5 

207 ) -> dict[str, Any]: 

208 """Get lightweight context summary without loading full content. 

209 

210 Provides overview of conversation state including: 

211 - Message count and token usage 

212 - Summary messages 

213 - Recent activity (last N messages) 

214 - Key messages (high importance) 

215 - Error messages 

216 

217 Args: 

218 conversation_id: Conversation to summarize 

219 recent_limit: Number of recent messages to include (default: 5) 

220 

221 Returns: 

222 Dictionary with context summary: 

223 - conversation_id (str) 

224 - message_count (int) 

225 - total_tokens (int) 

226 - summaries (list): Summary messages 

227 - recent_activity (list): Last N message summaries 

228 - key_messages (list): High-importance messages 

229 - errors (list): Error messages 

230 

231 Examples: 

232 # Get quick overview 

233 summary = manager.get_context_summary(conv_id) 

234 print(f"Messages: {summary['message_count']}") 

235 print(f"Tokens: {summary['total_tokens']}") 

236 print(f"Errors: {len(summary['errors'])}") 

237 """ 

238 from harnessutils.query import build_context_summary 

239 

240 messages = self.get_messages(conversation_id) 

241 summary = build_context_summary(conversation_id, messages, recent_limit) 

242 return summary.to_dict() 

243 

244 def inspect_context(self, conversation_id: str) -> ContextInspector: 

245 """Create inspector for querying context state. 

246 

247 Provides observability into: 

248 - What's currently in context 

249 - Token counts and breakdowns 

250 - Pruning predictions 

251 - Audit trail of decisions 

252 

253 Args: 

254 conversation_id: Conversation to inspect 

255 

256 Returns: 

257 ContextInspector instance with full query capabilities 

258 

259 Example: 

260 >>> inspector = manager.inspect_context(conv_id) 

261 >>> summary = inspector.summary() 

262 >>> print(f"Total tokens: {summary['total_tokens']}") 

263 >>> impact = inspector.predict_impact(5000) 

264 >>> if impact['would_trigger_pruning']: 

265 ... print(f"Would prune ~{impact['estimated_pruned_count']} outputs") 

266 """ 

267 messages = self.get_messages(conversation_id) 

268 

269 # Load conversation metadata 

270 conv_data = self.storage.load_conversation(conversation_id) 

271 conversation = Conversation.from_dict(conv_data) 

272 

273 # Load persisted pruning decisions for audit trail 

274 decisions_data = conversation.metadata.get("latest_pruning_decisions", []) 

275 decisions = [PruningDecision.from_dict(d) for d in decisions_data] 

276 

277 return ContextInspector(messages, self.config, conversation, decisions) 

278 

279 def prune_before_turn( 

280 self, 

281 conversation_id: str, 

282 auto_mode: bool = False, 

283 ) -> dict[str, Any]: 

284 """Proactively prune old tool outputs before processing a turn. 

285 

286 This is Tier 2 compaction - removes old tool outputs while 

287 preserving conversation structure. 

288 

289 Args: 

290 conversation_id: Conversation to prune 

291 auto_mode: Whether this was auto-triggered 

292 

293 Returns: 

294 Detailed pruning result with token tracking and breakdown: 

295 - pruned: Total outputs removed 

296 - tokens_saved: Total tokens saved 

297 - tokens_before: Token count before pruning 

298 - tokens_after: Token count after pruning 

299 - duplicates_pruned: Outputs removed due to duplication 

300 - importance_pruned: Outputs removed due to low importance 

301 - duplicate_tokens_saved: Tokens saved from deduplication 

302 - importance_tokens_saved: Tokens saved from importance pruning 

303 - reduction_percent: Percentage reduction in token usage 

304 """ 

305 if not self.config.compaction.prune and auto_mode: 

306 return { 

307 "pruned": 0, 

308 "tokens_saved": 0, 

309 "tokens_before": 0, 

310 "tokens_after": 0, 

311 "duplicates_pruned": 0, 

312 "importance_pruned": 0, 

313 "duplicate_tokens_saved": 0, 

314 "importance_tokens_saved": 0, 

315 "reduction_percent": 0, 

316 } 

317 

318 messages = self.get_messages(conversation_id) 

319 result = prune_tool_outputs( 

320 messages, 

321 self.config.pruning, 

322 ) 

323 

324 for msg in messages: 

325 self.storage.save_message(conversation_id, msg.id, msg.to_dict()) 

326 

327 # Invalidate cache — pruning mutated messages in-place 

328 if conversation_id in self._message_cache: 

329 del self._message_cache[conversation_id] 

330 

331 # Persist decisions so audit trail and quality metrics work 

332 conv_data = self.storage.load_conversation(conversation_id) 

333 conv = Conversation.from_dict(conv_data) 

334 conv.metadata["latest_pruning_decisions"] = [d.to_dict() for d in result.decisions] 

335 self.storage.save_conversation(conversation_id, conv.to_dict()) 

336 

337 return result.to_dict() 

338 

339 def predict_overflow( 

340 self, 

341 conversation_id: str, 

342 current_usage: Usage, 

343 ) -> bool: 

344 """Predict if conversation will overflow in next N turns. 

345 

346 Args: 

347 conversation_id: Conversation to check 

348 current_usage: Current token usage 

349 

350 Returns: 

351 True if overflow predicted within lookahead window 

352 """ 

353 if not self.config.compaction.use_predictive: 

354 return False 

355 

356 # Load conversation and get velocity 

357 conv_data = self.storage.load_conversation(conversation_id) 

358 conv = Conversation.from_dict(conv_data) 

359 velocity = conv.get_velocity() 

360 

361 if velocity is None or not velocity.turn_deltas: 

362 return False # No velocity data yet 

363 

364 # Project tokens ahead 

365 lookahead = self.config.compaction.predictive_lookahead 

366 predicted_growth = velocity.predict_tokens_ahead(lookahead) 

367 

368 # Calculate current total and projected total 

369 current_total = current_usage.input + current_usage.cache.read 

370 projected_total = current_total + predicted_growth 

371 

372 # Check against safety margin 

373 usable_space = ( 

374 self.config.model_limits.default_context_limit 

375 - self.config.model_limits.default_output_limit 

376 ) 

377 safety_threshold = usable_space * self.config.compaction.predictive_safety_margin 

378 

379 return projected_total > safety_threshold 

380 

381 def needs_compaction( 

382 self, 

383 conversation_id: str, 

384 usage: Usage, 

385 ) -> bool: 

386 """Check if conversation needs summarization (Tier 3). 

387 

388 Uses both reactive (overflow) and predictive checks. 

389 

390 Args: 

391 conversation_id: Conversation to check 

392 usage: Token usage from last turn 

393 

394 Returns: 

395 True if summarization needed 

396 """ 

397 # Reactive check: already overflowed 

398 if is_overflow( 

399 usage, 

400 self.config.model_limits.default_context_limit, 

401 self.config.model_limits.default_output_limit, 

402 ): 

403 return True 

404 

405 # Predictive check: will overflow soon 

406 return self.predict_overflow(conversation_id, usage) 

407 

408 def compact( 

409 self, 

410 conversation_id: str, 

411 llm_client: LLMClient, 

412 parent_message_id: str, 

413 model: str | None = None, 

414 auto_mode: bool = False, 

415 ) -> dict[str, Any]: 

416 """Compact conversation using LLM summarization (Tier 3). 

417 

418 Args: 

419 conversation_id: Conversation to compact 

420 llm_client: LLM client for summarization 

421 parent_message_id: Message that triggered compaction 

422 model: Optional model to use for summarization 

423 auto_mode: Whether this was auto-triggered 

424 

425 Returns: 

426 Compaction result with summary message and metrics 

427 """ 

428 if not self.config.compaction.auto and auto_mode: 

429 return {"summarized": False} 

430 

431 messages = self.get_messages(conversation_id) 

432 summary_id = generate_id("msg") 

433 

434 result = summarize_conversation( 

435 messages=messages, 

436 llm_client=llm_client, 

437 parent_message_id=parent_message_id, 

438 message_id=summary_id, 

439 model=model, 

440 auto_mode=auto_mode, 

441 config=self.config.summarization, 

442 ) 

443 

444 self.add_message(conversation_id, result.summary_message) 

445 

446 return { 

447 "summarized": True, 

448 "summary_message_id": summary_id, 

449 "tokens_used": result.tokens_used.total, 

450 "cost": result.cost, 

451 } 

452 

453 def to_model_format(self, conversation_id: str) -> list[dict[str, Any]]: 

454 """Convert conversation messages to model format for LLM requests. 

455 

456 Args: 

457 conversation_id: Conversation to convert 

458 

459 Returns: 

460 List of messages in model format 

461 """ 

462 messages = self.get_messages(conversation_id) 

463 return to_model_messages(messages) 

464 

465 def calculate_context_usage( 

466 self, 

467 conversation_id: str, 

468 model: str | None = None, 

469 ) -> int: 

470 """Calculate exact token count for conversation using tiktoken. 

471 

472 This counts ALL tokens in the conversation (user messages, assistant 

473 responses, tool outputs, etc.) that will be sent to the model. 

474 

475 Uses cl100k_base tokenizer (GPT-4/Claude) which works for most modern LLMs. 

476 

477 Args: 

478 conversation_id: Conversation to calculate usage for 

479 model: Optional model name (currently unused, defaults to cl100k_base) 

480 

481 Returns: 

482 Exact token count that will be used in context window 

483 """ 

484 from harnessutils.tokens.exact import count_tokens_exact 

485 

486 messages = self.to_model_format(conversation_id) 

487 return count_tokens_exact(messages, model) 

488 

489 def get_tool_output_tokens(self, conversation_id: str) -> dict[str, Any]: 

490 """Get detailed breakdown of token usage for tool outputs. 

491 

492 Args: 

493 conversation_id: Conversation to analyze 

494 

495 Returns: 

496 Dictionary with token breakdown: 

497 - total: Total tokens in tool outputs 

498 - by_tool: Token count per tool type 

499 - prunable: Tokens that could be pruned 

500 - protected: Tokens in protected outputs 

501 """ 

502 from harnessutils.compaction.pruning import calculate_context_tokens 

503 

504 messages = self.get_messages(conversation_id) 

505 

506 total = calculate_context_tokens(messages) 

507 by_tool: dict[str, int] = {} 

508 prunable = 0 

509 protected = 0 

510 turns_skipped = 0 

511 

512 for msg in reversed(messages): 

513 if msg.role == "user": 

514 turns_skipped += 1 

515 

516 for part in msg.parts: 

517 from harnessutils.models.parts import ToolPart 

518 from harnessutils.tokens.exact import count_tokens_fast 

519 

520 if not isinstance(part, ToolPart): 

521 continue 

522 

523 if part.state.status != "completed": 

524 continue 

525 

526 if not part.state.output: 

527 continue 

528 

529 tokens = count_tokens_fast(part.state.output) 

530 

531 # Track by tool type 

532 by_tool[part.tool] = by_tool.get(part.tool, 0) + tokens 

533 

534 # Determine if prunable 

535 is_protected = ( 

536 turns_skipped < self.config.pruning.protect_turns 

537 or part.tool in self.config.pruning.protected_tools 

538 or (part.state.time and part.state.time.compacted) 

539 ) 

540 

541 if is_protected: 

542 protected += tokens 

543 else: 

544 prunable += tokens 

545 

546 return { 

547 "total": total, 

548 "by_tool": by_tool, 

549 "prunable": prunable, 

550 "protected": protected, 

551 "prunability_percent": round((prunable / total * 100) if total > 0 else 0, 1), 

552 } 

553 

554 def get_context_quality(self, conversation_id: str) -> dict[str, Any]: 

555 """Get current quality metrics for conversation. 

556 

557 Calculates all quality metrics, updates history, and returns assessment 

558 with health status and actionable recommendations. 

559 

560 Args: 

561 conversation_id: Conversation to analyze 

562 

563 Returns: 

564 Dictionary with all metrics, health status, and recommendations. 

565 Keys: information_density, redundancy_ratio, staleness_score, 

566 error_preservation_rate, protected_ratio, health, recommendations 

567 """ 

568 from harnessutils.quality import assess_quality 

569 

570 conv_data = self.storage.load_conversation(conversation_id) 

571 conv = Conversation.from_dict(conv_data) 

572 messages = self.get_messages(conversation_id) 

573 

574 # Get pruning decisions from metadata if available 

575 decisions = conv.metadata.get("latest_pruning_decisions") 

576 

577 snapshot = assess_quality( 

578 messages=messages, 

579 config=self.config.pruning, 

580 decisions=decisions, 

581 ) 

582 

583 # Update history 

584 conv.update_quality_history(snapshot) 

585 self.storage.save_conversation(conversation_id, conv.to_dict()) 

586 

587 return snapshot.to_dict() 

588 

589 def track_quality_metric( 

590 self, 

591 conversation_id: str, 

592 metric_name: str, 

593 value: float, 

594 timestamp: int | None = None, 

595 ) -> None: 

596 """Track a single quality metric value. 

597 

598 Note: Prefer get_context_quality() which calculates all metrics. 

599 This is for custom/external metrics. 

600 

601 Args: 

602 conversation_id: Conversation to track 

603 metric_name: Name of metric (e.g., "information_density") 

604 value: Metric value 

605 timestamp: Unix ms timestamp (defaults to now) 

606 """ 

607 from harnessutils.quality import QualitySnapshot 

608 

609 if timestamp is None: 

610 timestamp = int(time.time() * 1000) 

611 

612 conv_data = self.storage.load_conversation(conversation_id) 

613 conv = Conversation.from_dict(conv_data) 

614 

615 # Create minimal snapshot with just this metric 

616 # (Other metrics set to 0.0, empty recommendations) 

617 snapshot = QualitySnapshot( 

618 timestamp=timestamp, 

619 information_density=value if metric_name == "information_density" else 0.0, 

620 redundancy_ratio=value if metric_name == "redundancy_ratio" else 0.0, 

621 staleness_score=value if metric_name == "staleness_score" else 0.0, 

622 error_preservation_rate=value 

623 if metric_name == "error_preservation_rate" 

624 else 0.0, 

625 protected_ratio=value if metric_name == "protected_ratio" else 0.0, 

626 health="unknown", 

627 recommendations=[], 

628 ) 

629 

630 conv.update_quality_history(snapshot) 

631 self.storage.save_conversation(conversation_id, conv.to_dict()) 

632 

633 def get_quality_trend( 

634 self, 

635 conversation_id: str, 

636 metric: str, 

637 window: int = 20, 

638 ) -> list[tuple[int, float]]: 

639 """Get trend data for a specific metric. 

640 

641 Args: 

642 conversation_id: Conversation to query 

643 metric: Metric name (e.g., "information_density") 

644 window: Number of most recent snapshots to return 

645 

646 Returns: 

647 List of (timestamp, value) tuples, most recent first 

648 """ 

649 conv_data = self.storage.load_conversation(conversation_id) 

650 conv = Conversation.from_dict(conv_data) 

651 

652 history = conv.get_quality_history() 

653 if history is None: 

654 return [] 

655 

656 return history.get_trend(metric, window) 

657 

658 def truncate_tool_output( 

659 self, 

660 output: str, 

661 tool_name: str, 

662 ) -> str: 

663 """Truncate tool output if it exceeds limits (Tier 1). 

664 

665 Args: 

666 output: Tool output to truncate 

667 tool_name: Name of the tool 

668 

669 Returns: 

670 Potentially truncated output 

671 """ 

672 output_id = generate_id(f"output_{tool_name}") 

673 

674 result = truncate_output( 

675 output=output, 

676 config=self.config.truncation, 

677 output_id=output_id, 

678 ) 

679 

680 if result.truncated and result.output_path: 

681 self.storage.save_truncated_output(result.output_path, output) 

682 

683 return result.content 

684 

685 def create_snapshot( 

686 self, 

687 conversation_id: str, 

688 snapshot_id: str | None = None, 

689 metadata: dict[str, Any] | None = None, 

690 ) -> Snapshot: 

691 """Create snapshot of conversation state for reproducibility. 

692 

693 Captures full conversation state including: 

694 - All messages with content 

695 - Conversation metadata (velocity, etc.) 

696 - Current configuration 

697 

698 Useful for: 

699 - Debugging (save state before/after changes) 

700 - A/B testing (compare different strategies) 

701 - Reproducibility (restore exact state) 

702 

703 Args: 

704 conversation_id: Conversation to snapshot 

705 snapshot_id: Optional snapshot ID (auto-generated if None) 

706 metadata: Optional metadata (e.g., {"reason": "before_refactor"}) 

707 

708 Returns: 

709 Created snapshot 

710 

711 Example: 

712 >>> snap = manager.create_snapshot(conv_id, metadata={"test": "baseline"}) 

713 >>> # Make changes... 

714 >>> snap2 = manager.create_snapshot(conv_id, metadata={"test": "optimized"}) 

715 >>> diff = manager.compare_snapshots(snap.snapshot_id, snap2.snapshot_id) 

716 """ 

717 messages = self.get_messages(conversation_id) 

718 

719 conv_data = self.storage.load_conversation(conversation_id) 

720 conversation = Conversation.from_dict(conv_data) 

721 

722 # Serialize config to dict recursively (convert dataclasses, Paths, etc.) 

723 from dataclasses import asdict, is_dataclass 

724 from pathlib import Path 

725 

726 def _serialize_config(obj: Any) -> Any: 

727 """Recursively convert config to JSON-safe dict.""" 

728 if isinstance(obj, Path): 

729 return str(obj) 

730 elif is_dataclass(obj) and not isinstance(obj, type): 

731 # Convert dataclass instance to dict, then recursively serialize values 

732 obj_dict = asdict(obj) 

733 return {k: _serialize_config(v) for k, v in obj_dict.items()} 

734 elif isinstance(obj, dict): 

735 return {k: _serialize_config(v) for k, v in obj.items()} 

736 elif isinstance(obj, (list, tuple)): 

737 return [_serialize_config(item) for item in obj] 

738 else: 

739 return obj 

740 

741 config_dict = _serialize_config(self.config) 

742 

743 return self.snapshot_manager.create_snapshot( 

744 conversation_id=conversation_id, 

745 messages=messages, 

746 conversation=conversation, 

747 config=config_dict, 

748 snapshot_id=snapshot_id, 

749 metadata=metadata, 

750 ) 

751 

752 def restore_snapshot(self, snapshot_id: str) -> str: 

753 """Restore conversation from snapshot. 

754 

755 WARNING: This replaces current conversation state with snapshot state. 

756 Consider creating a snapshot of current state first. 

757 

758 Args: 

759 snapshot_id: Snapshot to restore 

760 

761 Returns: 

762 Conversation ID of restored conversation 

763 

764 Raises: 

765 FileNotFoundError: If snapshot not found 

766 """ 

767 snapshot = self.snapshot_manager.get_snapshot(snapshot_id) 

768 if not snapshot: 

769 raise FileNotFoundError(f"Snapshot {snapshot_id} not found") 

770 

771 messages, conversation = self.snapshot_manager.restore_snapshot(snapshot) 

772 

773 # Clear existing messages (replace entire conversation state) 

774 # For memory storage, clear the messages dict for this conversation 

775 if hasattr(self.storage, "messages"): 

776 if snapshot.conversation_id in self.storage.messages: 

777 self.storage.messages[snapshot.conversation_id] = {} 

778 

779 # Save conversation 

780 self.storage.save_conversation(snapshot.conversation_id, conversation.to_dict()) 

781 

782 # Save messages 

783 for msg in messages: 

784 self.storage.save_message(snapshot.conversation_id, msg.id, msg.to_dict()) 

785 

786 # Clear cache 

787 if snapshot.conversation_id in self._message_cache: 

788 del self._message_cache[snapshot.conversation_id] 

789 

790 return snapshot.conversation_id 

791 

792 def compare_snapshots( 

793 self, snapshot1_id: str, snapshot2_id: str 

794 ) -> SnapshotDiff | None: 

795 """Compare two snapshots to see what changed. 

796 

797 Args: 

798 snapshot1_id: First snapshot ID (earlier) 

799 snapshot2_id: Second snapshot ID (later) 

800 

801 Returns: 

802 SnapshotDiff describing changes: 

803 - messages_added: Number of messages added 

804 - messages_removed: Number of messages removed 

805 - tokens_delta: Change in token count 

806 - message_changes: List of specific changes 

807 - config_changes: Configuration modifications 

808 - metadata_changes: Additional metadata 

809 

810 None if snapshots not found 

811 """ 

812 return self.snapshot_manager.compare_snapshots(snapshot1_id, snapshot2_id) 

813 

814 def export_snapshot(self, snapshot_id: str, file_path: str) -> None: 

815 """Export snapshot to JSON file for version control. 

816 

817 Args: 

818 snapshot_id: Snapshot to export 

819 file_path: Path to write JSON file 

820 

821 Raises: 

822 FileNotFoundError: If snapshot not found 

823 

824 Example: 

825 >>> snap = manager.create_snapshot(conv_id) 

826 >>> manager.export_snapshot(snap.snapshot_id, "snapshots/baseline.json") 

827 >>> # Commit to git for reproducibility 

828 """ 

829 self.snapshot_manager.export_snapshot(snapshot_id, file_path) 

830 

831 def import_snapshot(self, file_path: str) -> Snapshot: 

832 """Import snapshot from JSON file. 

833 

834 Args: 

835 file_path: Path to JSON file 

836 

837 Returns: 

838 Imported snapshot 

839 """ 

840 return self.snapshot_manager.import_snapshot(file_path) 

841 

842 def cleanup_stale_data( 

843 self, 

844 conversation_id: str, 

845 max_age_hours: float = 24, 

846 keep_errors: bool = True, 

847 execute: bool = False, 

848 ) -> dict[str, Any]: 

849 """Clean up stale conversation data. 

850 

851 By default, identifies old messages and tool outputs for cleanup based on age 

852 without making any changes. Set execute=True to actually clear stale outputs. 

853 

854 Args: 

855 conversation_id: Conversation to clean up 

856 max_age_hours: Maximum age in hours before considering stale (default: 24) 

857 keep_errors: Whether to preserve error messages (default: True) 

858 execute: When True, actually clears stale outputs (default: False) 

859 

860 Returns: 

861 Dictionary with cleanup statistics: 

862 - messages_archived (int): Messages eligible for archival 

863 - tokens_freed (int): Tokens that would be freed 

864 - duplicates_removed (int): Duplicate outputs found 

865 - stale_outputs_pruned (int): Stale outputs identified/cleared 

866 - operations (list[str]): Description of operations 

867 

868 Example: 

869 >>> result = manager.cleanup_stale_data(conv_id, max_age_hours=48) 

870 >>> print(f"Can free {result['tokens_freed']} tokens") 

871 >>> result = manager.cleanup_stale_data(conv_id, max_age_hours=48, execute=True) 

872 >>> print(f"Freed {result['tokens_freed']} tokens") 

873 """ 

874 from harnessutils.maintenance import cleanup_stale_data 

875 

876 messages = self.get_messages(conversation_id) 

877 result = cleanup_stale_data( 

878 messages=messages, 

879 config=self.config.pruning, 

880 max_age_hours=max_age_hours, 

881 keep_errors=keep_errors, 

882 execute=execute, 

883 ) 

884 

885 if execute: 

886 for msg in messages: 

887 self.storage.save_message(conversation_id, msg.id, msg.to_dict()) 

888 # Invalidate cache — outputs were cleared in-place 

889 if conversation_id in self._message_cache: 

890 del self._message_cache[conversation_id] 

891 

892 return result.to_dict() 

893 

894 def scan_and_deduplicate(self, conversation_id: str) -> dict[str, Any]: 

895 """Scan for duplicate outputs and return statistics. 

896 

897 Identifies duplicate tool outputs without removing them. 

898 Use prune_before_turn() to actually remove duplicates. 

899 

900 Args: 

901 conversation_id: Conversation to scan 

902 

903 Returns: 

904 Dictionary with deduplication statistics: 

905 - duplicates_removed (int): Number of duplicates found 

906 - tokens_freed (int): Tokens that would be freed 

907 - operations (list[str]): Description of findings 

908 

909 Example: 

910 >>> result = manager.scan_and_deduplicate(conv_id) 

911 >>> if result['duplicates_removed'] > 0: 

912 ... print(f"Found {result['duplicates_removed']} duplicates") 

913 ... # Run compaction to actually remove them 

914 ... manager.prune_before_turn(conv_id) 

915 """ 

916 from harnessutils.maintenance import scan_and_deduplicate 

917 

918 messages = self.get_messages(conversation_id) 

919 result = scan_and_deduplicate( 

920 messages=messages, 

921 config=self.config.pruning, 

922 ) 

923 

924 return result.to_dict() 

925 

926 def get_memory(self, project_id: str, key: str, default: Any = None) -> Any: 

927 """Get a value from project-scoped memory. 

928 

929 Args: 

930 project_id: Project to retrieve memory from 

931 key: Memory key 

932 default: Default value if key not found 

933 

934 Returns: 

935 Stored value or default 

936 """ 

937 data = self._load_project_memory(project_id) 

938 return data.get(key, default) 

939 

940 def set_memory(self, project_id: str, key: str, value: Any) -> None: 

941 """Set a value in project-scoped memory. 

942 

943 Args: 

944 project_id: Project to store memory in 

945 key: Memory key 

946 value: Value to store 

947 """ 

948 data = self._load_project_memory(project_id) 

949 data[key] = value 

950 try: 

951 self.storage.save_project_memory(project_id, data) 

952 except AttributeError: 

953 pass # Backend doesn't support project memory 

954 

955 def delete_memory(self, project_id: str, key: str) -> None: 

956 """Delete a key from project-scoped memory. 

957 

958 Args: 

959 project_id: Project to delete memory from 

960 key: Memory key to delete 

961 """ 

962 data = self._load_project_memory(project_id) 

963 data.pop(key, None) 

964 try: 

965 self.storage.save_project_memory(project_id, data) 

966 except AttributeError: 

967 pass # Backend doesn't support project memory 

968 

969 def list_memory(self, project_id: str) -> dict[str, Any]: 

970 """List all memory keys and values for a project. 

971 

972 Args: 

973 project_id: Project to list memory for 

974 

975 Returns: 

976 Dictionary of all memory key-value pairs 

977 """ 

978 return self._load_project_memory(project_id) 

979 

980 def _load_project_memory(self, project_id: str) -> dict[str, Any]: 

981 """Load project memory, returning empty dict on failure. 

982 

983 Args: 

984 project_id: Project to load memory for 

985 

986 Returns: 

987 Memory dictionary (empty if not found or backend unsupported) 

988 """ 

989 try: 

990 return self.storage.load_project_memory(project_id) 

991 except (FileNotFoundError, AttributeError): 

992 return {} 

993 

994 def detect_context_issues(self, conversation_id: str) -> list[dict[str, Any]]: 

995 """Detect quality and drift issues in conversation context. 

996 

997 Analyzes conversation for common problems: 

998 - High redundancy (duplicate content) 

999 - Staleness accumulation (old messages) 

1000 - Low information density 

1001 - Error preservation status 

1002 - Excessive protection (limiting pruning) 

1003 

1004 Args: 

1005 conversation_id: Conversation to analyze 

1006 

1007 Returns: 

1008 List of issue dictionaries, each containing: 

1009 - issue_type (str): Type of issue 

1010 - severity (str): "info", "warning", or "error" 

1011 - description (str): Human-readable description 

1012 - affected_count (int): Number of items affected 

1013 - suggested_fix (str): Recommended action 

1014 - metadata (dict): Additional details 

1015 

1016 Example: 

1017 >>> issues = manager.detect_context_issues(conv_id) 

1018 >>> for issue in issues: 

1019 ... if issue['severity'] == 'warning': 

1020 ... print(f"⚠️ {issue['description']}") 

1021 ... print(f" Fix: {issue['suggested_fix']}") 

1022 """ 

1023 from harnessutils.maintenance import detect_context_issues 

1024 

1025 conv_data = self.storage.load_conversation(conversation_id) 

1026 conv = Conversation.from_dict(conv_data) 

1027 messages = self.get_messages(conversation_id) 

1028 

1029 issues = detect_context_issues( 

1030 messages=messages, 

1031 conversation=conv, 

1032 config=self.config.pruning, 

1033 ) 

1034 

1035 return [issue.to_dict() for issue in issues]