Coverage for src / harnessutils / storage / memory.py: 79%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 08:30 -0600

1"""In-memory storage implementation for testing.""" 

2 

3import time 

4from typing import Any 

5 

6 

7class MemoryStorage: 

8 """In-memory storage backend for testing. 

9 

10 All data is stored in memory and lost when the process exits. 

11 """ 

12 

13 def __init__(self) -> None: 

14 """Initialize in-memory storage.""" 

15 self.conversations: dict[str, dict[str, Any]] = {} 

16 self.messages: dict[str, dict[str, dict[str, Any]]] = {} 

17 self.parts: dict[str, dict[str, dict[str, Any]]] = {} 

18 self.truncated_outputs: dict[str, tuple[str, float]] = {} 

19 self._project_memory: dict[str, dict[str, Any]] = {} 

20 

21 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None: 

22 """Save conversation metadata.""" 

23 self.conversations[conversation_id] = data.copy() 

24 

25 def load_conversation(self, conversation_id: str) -> dict[str, Any]: 

26 """Load conversation metadata.""" 

27 if conversation_id not in self.conversations: 

28 raise FileNotFoundError(f"Conversation {conversation_id} not found") 

29 return self.conversations[conversation_id].copy() 

30 

31 def save_message(self, conversation_id: str, message_id: str, data: dict[str, Any]) -> None: 

32 """Save message metadata.""" 

33 if conversation_id not in self.messages: 

34 self.messages[conversation_id] = {} 

35 self.messages[conversation_id][message_id] = data.copy() 

36 

37 def load_message(self, conversation_id: str, message_id: str) -> dict[str, Any]: 

38 """Load message metadata.""" 

39 if conversation_id not in self.messages: 

40 raise FileNotFoundError( 

41 f"Message {message_id} in conversation {conversation_id} not found" 

42 ) 

43 if message_id not in self.messages[conversation_id]: 

44 raise FileNotFoundError( 

45 f"Message {message_id} in conversation {conversation_id} not found" 

46 ) 

47 return self.messages[conversation_id][message_id].copy() 

48 

49 def list_messages(self, conversation_id: str) -> list[str]: 

50 """List all message IDs for a conversation in chronological order.""" 

51 if conversation_id not in self.messages: 

52 return [] 

53 return sorted(self.messages[conversation_id].keys()) 

54 

55 def save_part(self, message_id: str, part_id: str, data: dict[str, Any]) -> None: 

56 """Save message part.""" 

57 if message_id not in self.parts: 

58 self.parts[message_id] = {} 

59 self.parts[message_id][part_id] = data.copy() 

60 

61 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]: 

62 """Load message part.""" 

63 if message_id not in self.parts: 

64 raise FileNotFoundError(f"Part {part_id} in message {message_id} not found") 

65 if part_id not in self.parts[message_id]: 

66 raise FileNotFoundError(f"Part {part_id} in message {message_id} not found") 

67 return self.parts[message_id][part_id].copy() 

68 

69 def list_parts(self, message_id: str) -> list[str]: 

70 """List all part IDs for a message in order.""" 

71 if message_id not in self.parts: 

72 return [] 

73 return sorted(self.parts[message_id].keys()) 

74 

75 def save_truncated_output(self, output_id: str, content: str) -> None: 

76 """Save full output that was truncated.""" 

77 self.truncated_outputs[output_id] = (content, time.time()) 

78 

79 def load_truncated_output(self, output_id: str) -> str: 

80 """Load full truncated output.""" 

81 if output_id not in self.truncated_outputs: 

82 raise FileNotFoundError(f"Truncated output {output_id} not found") 

83 return self.truncated_outputs[output_id][0] 

84 

85 def save_project_memory(self, project_id: str, data: dict[str, Any]) -> None: 

86 """Save project-scoped memory.""" 

87 self._project_memory[project_id] = data.copy() 

88 

89 def load_project_memory(self, project_id: str) -> dict[str, Any]: 

90 """Load project-scoped memory. Returns {} if not found.""" 

91 return self._project_memory.get(project_id, {}).copy() 

92 

93 def delete_project_memory(self, project_id: str) -> None: 

94 """Delete project-scoped memory.""" 

95 self._project_memory.pop(project_id, None) 

96 

97 def cleanup_old_outputs(self, retention_days: int) -> int: 

98 """Clean up truncated outputs older than retention period.""" 

99 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60) 

100 to_delete = [ 

101 output_id 

102 for output_id, (_, timestamp) in self.truncated_outputs.items() 

103 if timestamp < cutoff_timestamp 

104 ] 

105 

106 for output_id in to_delete: 

107 del self.truncated_outputs[output_id] 

108 

109 return len(to_delete)