Coverage for src / harness_utils / storage / memory.py: 78%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-31 13:48 -0600

1"""In-memory storage implementation for testing.""" 

2 

3import time 

4from typing import Any 

5 

6 

7class MemoryStorage: 

8 """In-memory storage backend for testing. 

9 

10 All data is stored in memory and lost when the process exits. 

11 """ 

12 

13 def __init__(self) -> None: 

14 """Initialize in-memory storage.""" 

15 self.conversations: dict[str, dict[str, Any]] = {} 

16 self.messages: dict[str, dict[str, dict[str, Any]]] = {} 

17 self.parts: dict[str, dict[str, dict[str, Any]]] = {} 

18 self.truncated_outputs: dict[str, tuple[str, float]] = {} 

19 

20 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None: 

21 """Save conversation metadata.""" 

22 self.conversations[conversation_id] = data.copy() 

23 

24 def load_conversation(self, conversation_id: str) -> dict[str, Any]: 

25 """Load conversation metadata.""" 

26 if conversation_id not in self.conversations: 

27 raise FileNotFoundError(f"Conversation {conversation_id} not found") 

28 return self.conversations[conversation_id].copy() 

29 

30 def save_message( 

31 self, 

32 conversation_id: str, 

33 message_id: str, 

34 data: dict[str, Any] 

35 ) -> None: 

36 """Save message metadata.""" 

37 if conversation_id not in self.messages: 

38 self.messages[conversation_id] = {} 

39 self.messages[conversation_id][message_id] = data.copy() 

40 

41 def load_message( 

42 self, 

43 conversation_id: str, 

44 message_id: str 

45 ) -> dict[str, Any]: 

46 """Load message metadata.""" 

47 if conversation_id not in self.messages: 

48 raise FileNotFoundError( 

49 f"Message {message_id} in conversation {conversation_id} not found" 

50 ) 

51 if message_id not in self.messages[conversation_id]: 

52 raise FileNotFoundError( 

53 f"Message {message_id} in conversation {conversation_id} not found" 

54 ) 

55 return self.messages[conversation_id][message_id].copy() 

56 

57 def list_messages(self, conversation_id: str) -> list[str]: 

58 """List all message IDs for a conversation in chronological order.""" 

59 if conversation_id not in self.messages: 

60 return [] 

61 return sorted(self.messages[conversation_id].keys()) 

62 

63 def save_part( 

64 self, 

65 message_id: str, 

66 part_id: str, 

67 data: dict[str, Any] 

68 ) -> None: 

69 """Save message part.""" 

70 if message_id not in self.parts: 

71 self.parts[message_id] = {} 

72 self.parts[message_id][part_id] = data.copy() 

73 

74 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]: 

75 """Load message part.""" 

76 if message_id not in self.parts: 

77 raise FileNotFoundError( 

78 f"Part {part_id} in message {message_id} not found" 

79 ) 

80 if part_id not in self.parts[message_id]: 

81 raise FileNotFoundError( 

82 f"Part {part_id} in message {message_id} not found" 

83 ) 

84 return self.parts[message_id][part_id].copy() 

85 

86 def list_parts(self, message_id: str) -> list[str]: 

87 """List all part IDs for a message in order.""" 

88 if message_id not in self.parts: 

89 return [] 

90 return sorted(self.parts[message_id].keys()) 

91 

92 def save_truncated_output(self, output_id: str, content: str) -> None: 

93 """Save full output that was truncated.""" 

94 self.truncated_outputs[output_id] = (content, time.time()) 

95 

96 def load_truncated_output(self, output_id: str) -> str: 

97 """Load full truncated output.""" 

98 if output_id not in self.truncated_outputs: 

99 raise FileNotFoundError(f"Truncated output {output_id} not found") 

100 return self.truncated_outputs[output_id][0] 

101 

102 def cleanup_old_outputs(self, retention_days: int) -> int: 

103 """Clean up truncated outputs older than retention period.""" 

104 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60) 

105 to_delete = [ 

106 output_id 

107 for output_id, (_, timestamp) in self.truncated_outputs.items() 

108 if timestamp < cutoff_timestamp 

109 ] 

110 

111 for output_id in to_delete: 

112 del self.truncated_outputs[output_id] 

113 

114 return len(to_delete)