Coverage for src / harness_utils / storage / memory.py: 78%
54 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-31 13:48 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-31 13:48 -0600
1"""In-memory storage implementation for testing."""
3import time
4from typing import Any
7class MemoryStorage:
8 """In-memory storage backend for testing.
10 All data is stored in memory and lost when the process exits.
11 """
13 def __init__(self) -> None:
14 """Initialize in-memory storage."""
15 self.conversations: dict[str, dict[str, Any]] = {}
16 self.messages: dict[str, dict[str, dict[str, Any]]] = {}
17 self.parts: dict[str, dict[str, dict[str, Any]]] = {}
18 self.truncated_outputs: dict[str, tuple[str, float]] = {}
20 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None:
21 """Save conversation metadata."""
22 self.conversations[conversation_id] = data.copy()
24 def load_conversation(self, conversation_id: str) -> dict[str, Any]:
25 """Load conversation metadata."""
26 if conversation_id not in self.conversations:
27 raise FileNotFoundError(f"Conversation {conversation_id} not found")
28 return self.conversations[conversation_id].copy()
30 def save_message(
31 self,
32 conversation_id: str,
33 message_id: str,
34 data: dict[str, Any]
35 ) -> None:
36 """Save message metadata."""
37 if conversation_id not in self.messages:
38 self.messages[conversation_id] = {}
39 self.messages[conversation_id][message_id] = data.copy()
41 def load_message(
42 self,
43 conversation_id: str,
44 message_id: str
45 ) -> dict[str, Any]:
46 """Load message metadata."""
47 if conversation_id not in self.messages:
48 raise FileNotFoundError(
49 f"Message {message_id} in conversation {conversation_id} not found"
50 )
51 if message_id not in self.messages[conversation_id]:
52 raise FileNotFoundError(
53 f"Message {message_id} in conversation {conversation_id} not found"
54 )
55 return self.messages[conversation_id][message_id].copy()
57 def list_messages(self, conversation_id: str) -> list[str]:
58 """List all message IDs for a conversation in chronological order."""
59 if conversation_id not in self.messages:
60 return []
61 return sorted(self.messages[conversation_id].keys())
63 def save_part(
64 self,
65 message_id: str,
66 part_id: str,
67 data: dict[str, Any]
68 ) -> None:
69 """Save message part."""
70 if message_id not in self.parts:
71 self.parts[message_id] = {}
72 self.parts[message_id][part_id] = data.copy()
74 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]:
75 """Load message part."""
76 if message_id not in self.parts:
77 raise FileNotFoundError(
78 f"Part {part_id} in message {message_id} not found"
79 )
80 if part_id not in self.parts[message_id]:
81 raise FileNotFoundError(
82 f"Part {part_id} in message {message_id} not found"
83 )
84 return self.parts[message_id][part_id].copy()
86 def list_parts(self, message_id: str) -> list[str]:
87 """List all part IDs for a message in order."""
88 if message_id not in self.parts:
89 return []
90 return sorted(self.parts[message_id].keys())
92 def save_truncated_output(self, output_id: str, content: str) -> None:
93 """Save full output that was truncated."""
94 self.truncated_outputs[output_id] = (content, time.time())
96 def load_truncated_output(self, output_id: str) -> str:
97 """Load full truncated output."""
98 if output_id not in self.truncated_outputs:
99 raise FileNotFoundError(f"Truncated output {output_id} not found")
100 return self.truncated_outputs[output_id][0]
102 def cleanup_old_outputs(self, retention_days: int) -> int:
103 """Clean up truncated outputs older than retention period."""
104 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60)
105 to_delete = [
106 output_id
107 for output_id, (_, timestamp) in self.truncated_outputs.items()
108 if timestamp < cutoff_timestamp
109 ]
111 for output_id in to_delete:
112 del self.truncated_outputs[output_id]
114 return len(to_delete)