Coverage for src / harness_utils / manager.py: 71%
69 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-31 14:07 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-31 14:07 -0600
1"""Main ConversationManager API for harness-utils."""
3import time
4from typing import Any
6from harness_utils.compaction.pruning import prune_tool_outputs
7from harness_utils.compaction.summarization import is_overflow, summarize_conversation
8from harness_utils.compaction.truncation import truncate_output
9from harness_utils.config import HarnessConfig
10from harness_utils.conversion.to_model import to_model_messages
11from harness_utils.models.conversation import Conversation
12from harness_utils.models.message import Message
13from harness_utils.models.usage import Usage
14from harness_utils.storage.filesystem import FilesystemStorage
15from harness_utils.storage.memory import MemoryStorage
16from harness_utils.types import LLMClient, StorageBackend
17from harness_utils.utils.ids import generate_id
20class ConversationManager:
21 """Main interface for managing conversations with context window management.
23 Provides high-level API for:
24 - Creating and managing conversations
25 - Adding messages
26 - Automatic context compaction (truncation, pruning, summarization)
27 - Message storage and retrieval
28 """
30 def __init__(
31 self,
32 storage: StorageBackend | None = None,
33 config: HarnessConfig | None = None,
34 ):
35 """Initialize conversation manager.
37 Args:
38 storage: Storage backend (uses in-memory if None)
39 config: Configuration (uses defaults if None)
40 """
41 self.config = config or HarnessConfig()
42 self.storage = storage or MemoryStorage()
43 self._message_cache: dict[str, list[Message]] = {}
45 def create_conversation(
46 self,
47 conversation_id: str | None = None,
48 project_id: str | None = None,
49 ) -> Conversation:
50 """Create a new conversation.
52 Args:
53 conversation_id: Optional conversation ID (generated if None)
54 project_id: Optional project ID for grouping
56 Returns:
57 New conversation object
58 """
59 if conversation_id is None:
60 conversation_id = generate_id("conv")
62 now = int(time.time() * 1000)
63 conversation = Conversation(
64 id=conversation_id,
65 project_id=project_id,
66 created=now,
67 updated=now,
68 )
70 self.storage.save_conversation(conversation_id, conversation.to_dict())
71 self._message_cache[conversation_id] = []
73 return conversation
75 def add_message(self, conversation_id: str, message: Message) -> None:
76 """Add a message to a conversation.
78 Args:
79 conversation_id: Conversation to add message to
80 message: Message to add
81 """
82 self.storage.save_message(conversation_id, message.id, message.to_dict())
84 if conversation_id not in self._message_cache:
85 self._message_cache[conversation_id] = []
86 self._message_cache[conversation_id].append(message)
88 conv = self.storage.load_conversation(conversation_id)
89 conv["updated"] = int(time.time() * 1000)
90 self.storage.save_conversation(conversation_id, conv)
92 def get_messages(self, conversation_id: str) -> list[Message]:
93 """Get all messages for a conversation.
95 Args:
96 conversation_id: Conversation ID
98 Returns:
99 List of messages in chronological order
100 """
101 if conversation_id in self._message_cache:
102 return self._message_cache[conversation_id]
104 message_ids = self.storage.list_messages(conversation_id)
105 messages = [
106 Message.from_dict(self.storage.load_message(conversation_id, msg_id))
107 for msg_id in message_ids
108 ]
110 self._message_cache[conversation_id] = messages
111 return messages
113 def prune_before_turn(
114 self,
115 conversation_id: str,
116 ) -> dict[str, Any]:
117 """Proactively prune old tool outputs before processing a turn.
119 This is Tier 2 compaction - removes old tool outputs while
120 preserving conversation structure.
122 Args:
123 conversation_id: Conversation to prune
125 Returns:
126 Pruning result with count and tokens saved
127 """
128 if not self.config.compaction.prune:
129 return {"pruned": 0, "tokens_saved": 0}
131 messages = self.get_messages(conversation_id)
132 result = prune_tool_outputs(
133 messages,
134 self.config.pruning,
135 self.config.tokens.chars_per_token,
136 )
138 for msg in messages:
139 self.storage.save_message(conversation_id, msg.id, msg.to_dict())
141 return {"pruned": result.pruned, "tokens_saved": result.tokens_saved}
143 def needs_compaction(
144 self,
145 conversation_id: str,
146 usage: Usage,
147 ) -> bool:
148 """Check if conversation needs summarization (Tier 3).
150 Args:
151 conversation_id: Conversation to check
152 usage: Token usage from last turn
154 Returns:
155 True if summarization needed
156 """
157 return is_overflow(
158 usage,
159 self.config.model_limits.default_context_limit,
160 self.config.model_limits.default_output_limit,
161 )
163 def compact(
164 self,
165 conversation_id: str,
166 llm_client: LLMClient,
167 parent_message_id: str,
168 model: str | None = None,
169 auto_mode: bool = False,
170 ) -> dict[str, Any]:
171 """Compact conversation using LLM summarization (Tier 3).
173 Args:
174 conversation_id: Conversation to compact
175 llm_client: LLM client for summarization
176 parent_message_id: Message that triggered compaction
177 model: Optional model to use for summarization
178 auto_mode: Whether this was auto-triggered
180 Returns:
181 Compaction result with summary message and metrics
182 """
183 if not self.config.compaction.auto and not auto_mode:
184 return {"summarized": False}
186 messages = self.get_messages(conversation_id)
187 summary_id = generate_id("msg")
189 result = summarize_conversation(
190 messages=messages,
191 llm_client=llm_client,
192 parent_message_id=parent_message_id,
193 message_id=summary_id,
194 model=model,
195 auto_mode=auto_mode,
196 )
198 self.add_message(conversation_id, result.summary_message)
200 return {
201 "summarized": True,
202 "summary_message_id": summary_id,
203 "tokens_used": result.tokens_used.total,
204 "cost": result.cost,
205 }
207 def to_model_format(self, conversation_id: str) -> list[dict[str, Any]]:
208 """Convert conversation messages to model format for LLM requests.
210 Args:
211 conversation_id: Conversation to convert
213 Returns:
214 List of messages in model format
215 """
216 messages = self.get_messages(conversation_id)
217 return to_model_messages(messages)
219 def truncate_tool_output(
220 self,
221 output: str,
222 tool_name: str,
223 ) -> str:
224 """Truncate tool output if it exceeds limits (Tier 1).
226 Args:
227 output: Tool output to truncate
228 tool_name: Name of the tool
230 Returns:
231 Potentially truncated output
232 """
233 output_id = generate_id(f"output_{tool_name}")
235 result = truncate_output(
236 output=output,
237 config=self.config.truncation,
238 output_id=output_id,
239 )
241 if result.truncated and result.output_path:
242 self.storage.save_truncated_output(result.output_path, output)
244 return result.content