Coverage for src / harness_utils / manager.py: 71%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-31 14:07 -0600

1"""Main ConversationManager API for harness-utils.""" 

2 

3import time 

4from typing import Any 

5 

6from harness_utils.compaction.pruning import prune_tool_outputs 

7from harness_utils.compaction.summarization import is_overflow, summarize_conversation 

8from harness_utils.compaction.truncation import truncate_output 

9from harness_utils.config import HarnessConfig 

10from harness_utils.conversion.to_model import to_model_messages 

11from harness_utils.models.conversation import Conversation 

12from harness_utils.models.message import Message 

13from harness_utils.models.usage import Usage 

14from harness_utils.storage.filesystem import FilesystemStorage 

15from harness_utils.storage.memory import MemoryStorage 

16from harness_utils.types import LLMClient, StorageBackend 

17from harness_utils.utils.ids import generate_id 

18 

19 

20class ConversationManager: 

21 """Main interface for managing conversations with context window management. 

22 

23 Provides high-level API for: 

24 - Creating and managing conversations 

25 - Adding messages 

26 - Automatic context compaction (truncation, pruning, summarization) 

27 - Message storage and retrieval 

28 """ 

29 

30 def __init__( 

31 self, 

32 storage: StorageBackend | None = None, 

33 config: HarnessConfig | None = None, 

34 ): 

35 """Initialize conversation manager. 

36 

37 Args: 

38 storage: Storage backend (uses in-memory if None) 

39 config: Configuration (uses defaults if None) 

40 """ 

41 self.config = config or HarnessConfig() 

42 self.storage = storage or MemoryStorage() 

43 self._message_cache: dict[str, list[Message]] = {} 

44 

45 def create_conversation( 

46 self, 

47 conversation_id: str | None = None, 

48 project_id: str | None = None, 

49 ) -> Conversation: 

50 """Create a new conversation. 

51 

52 Args: 

53 conversation_id: Optional conversation ID (generated if None) 

54 project_id: Optional project ID for grouping 

55 

56 Returns: 

57 New conversation object 

58 """ 

59 if conversation_id is None: 

60 conversation_id = generate_id("conv") 

61 

62 now = int(time.time() * 1000) 

63 conversation = Conversation( 

64 id=conversation_id, 

65 project_id=project_id, 

66 created=now, 

67 updated=now, 

68 ) 

69 

70 self.storage.save_conversation(conversation_id, conversation.to_dict()) 

71 self._message_cache[conversation_id] = [] 

72 

73 return conversation 

74 

75 def add_message(self, conversation_id: str, message: Message) -> None: 

76 """Add a message to a conversation. 

77 

78 Args: 

79 conversation_id: Conversation to add message to 

80 message: Message to add 

81 """ 

82 self.storage.save_message(conversation_id, message.id, message.to_dict()) 

83 

84 if conversation_id not in self._message_cache: 

85 self._message_cache[conversation_id] = [] 

86 self._message_cache[conversation_id].append(message) 

87 

88 conv = self.storage.load_conversation(conversation_id) 

89 conv["updated"] = int(time.time() * 1000) 

90 self.storage.save_conversation(conversation_id, conv) 

91 

92 def get_messages(self, conversation_id: str) -> list[Message]: 

93 """Get all messages for a conversation. 

94 

95 Args: 

96 conversation_id: Conversation ID 

97 

98 Returns: 

99 List of messages in chronological order 

100 """ 

101 if conversation_id in self._message_cache: 

102 return self._message_cache[conversation_id] 

103 

104 message_ids = self.storage.list_messages(conversation_id) 

105 messages = [ 

106 Message.from_dict(self.storage.load_message(conversation_id, msg_id)) 

107 for msg_id in message_ids 

108 ] 

109 

110 self._message_cache[conversation_id] = messages 

111 return messages 

112 

113 def prune_before_turn( 

114 self, 

115 conversation_id: str, 

116 ) -> dict[str, Any]: 

117 """Proactively prune old tool outputs before processing a turn. 

118 

119 This is Tier 2 compaction - removes old tool outputs while 

120 preserving conversation structure. 

121 

122 Args: 

123 conversation_id: Conversation to prune 

124 

125 Returns: 

126 Pruning result with count and tokens saved 

127 """ 

128 if not self.config.compaction.prune: 

129 return {"pruned": 0, "tokens_saved": 0} 

130 

131 messages = self.get_messages(conversation_id) 

132 result = prune_tool_outputs( 

133 messages, 

134 self.config.pruning, 

135 self.config.tokens.chars_per_token, 

136 ) 

137 

138 for msg in messages: 

139 self.storage.save_message(conversation_id, msg.id, msg.to_dict()) 

140 

141 return {"pruned": result.pruned, "tokens_saved": result.tokens_saved} 

142 

143 def needs_compaction( 

144 self, 

145 conversation_id: str, 

146 usage: Usage, 

147 ) -> bool: 

148 """Check if conversation needs summarization (Tier 3). 

149 

150 Args: 

151 conversation_id: Conversation to check 

152 usage: Token usage from last turn 

153 

154 Returns: 

155 True if summarization needed 

156 """ 

157 return is_overflow( 

158 usage, 

159 self.config.model_limits.default_context_limit, 

160 self.config.model_limits.default_output_limit, 

161 ) 

162 

163 def compact( 

164 self, 

165 conversation_id: str, 

166 llm_client: LLMClient, 

167 parent_message_id: str, 

168 model: str | None = None, 

169 auto_mode: bool = False, 

170 ) -> dict[str, Any]: 

171 """Compact conversation using LLM summarization (Tier 3). 

172 

173 Args: 

174 conversation_id: Conversation to compact 

175 llm_client: LLM client for summarization 

176 parent_message_id: Message that triggered compaction 

177 model: Optional model to use for summarization 

178 auto_mode: Whether this was auto-triggered 

179 

180 Returns: 

181 Compaction result with summary message and metrics 

182 """ 

183 if not self.config.compaction.auto and not auto_mode: 

184 return {"summarized": False} 

185 

186 messages = self.get_messages(conversation_id) 

187 summary_id = generate_id("msg") 

188 

189 result = summarize_conversation( 

190 messages=messages, 

191 llm_client=llm_client, 

192 parent_message_id=parent_message_id, 

193 message_id=summary_id, 

194 model=model, 

195 auto_mode=auto_mode, 

196 ) 

197 

198 self.add_message(conversation_id, result.summary_message) 

199 

200 return { 

201 "summarized": True, 

202 "summary_message_id": summary_id, 

203 "tokens_used": result.tokens_used.total, 

204 "cost": result.cost, 

205 } 

206 

207 def to_model_format(self, conversation_id: str) -> list[dict[str, Any]]: 

208 """Convert conversation messages to model format for LLM requests. 

209 

210 Args: 

211 conversation_id: Conversation to convert 

212 

213 Returns: 

214 List of messages in model format 

215 """ 

216 messages = self.get_messages(conversation_id) 

217 return to_model_messages(messages) 

218 

219 def truncate_tool_output( 

220 self, 

221 output: str, 

222 tool_name: str, 

223 ) -> str: 

224 """Truncate tool output if it exceeds limits (Tier 1). 

225 

226 Args: 

227 output: Tool output to truncate 

228 tool_name: Name of the tool 

229 

230 Returns: 

231 Potentially truncated output 

232 """ 

233 output_id = generate_id(f"output_{tool_name}") 

234 

235 result = truncate_output( 

236 output=output, 

237 config=self.config.truncation, 

238 output_id=output_id, 

239 ) 

240 

241 if result.truncated and result.output_path: 

242 self.storage.save_truncated_output(result.output_path, output) 

243 

244 return result.content