Coverage for src / harnessutils / storage / memory.py: 78%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-12 22:41 -0600

1"""In-memory storage implementation for testing.""" 

2 

3import time 

4from typing import Any 

5 

6 

7class MemoryStorage: 

8 """In-memory storage backend for testing. 

9 

10 All data is stored in memory and lost when the process exits. 

11 """ 

12 

13 def __init__(self) -> None: 

14 """Initialize in-memory storage.""" 

15 self.conversations: dict[str, dict[str, Any]] = {} 

16 self.messages: dict[str, dict[str, dict[str, Any]]] = {} 

17 self.parts: dict[str, dict[str, dict[str, Any]]] = {} 

18 self.truncated_outputs: dict[str, tuple[str, float]] = {} 

19 

20 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None: 

21 """Save conversation metadata.""" 

22 self.conversations[conversation_id] = data.copy() 

23 

24 def load_conversation(self, conversation_id: str) -> dict[str, Any]: 

25 """Load conversation metadata.""" 

26 if conversation_id not in self.conversations: 

27 raise FileNotFoundError(f"Conversation {conversation_id} not found") 

28 return self.conversations[conversation_id].copy() 

29 

30 def save_message(self, conversation_id: str, message_id: str, data: dict[str, Any]) -> None: 

31 """Save message metadata.""" 

32 if conversation_id not in self.messages: 

33 self.messages[conversation_id] = {} 

34 self.messages[conversation_id][message_id] = data.copy() 

35 

36 def load_message(self, conversation_id: str, message_id: str) -> dict[str, Any]: 

37 """Load message metadata.""" 

38 if conversation_id not in self.messages: 

39 raise FileNotFoundError( 

40 f"Message {message_id} in conversation {conversation_id} not found" 

41 ) 

42 if message_id not in self.messages[conversation_id]: 

43 raise FileNotFoundError( 

44 f"Message {message_id} in conversation {conversation_id} not found" 

45 ) 

46 return self.messages[conversation_id][message_id].copy() 

47 

48 def list_messages(self, conversation_id: str) -> list[str]: 

49 """List all message IDs for a conversation in chronological order.""" 

50 if conversation_id not in self.messages: 

51 return [] 

52 return sorted(self.messages[conversation_id].keys()) 

53 

54 def save_part(self, message_id: str, part_id: str, data: dict[str, Any]) -> None: 

55 """Save message part.""" 

56 if message_id not in self.parts: 

57 self.parts[message_id] = {} 

58 self.parts[message_id][part_id] = data.copy() 

59 

60 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]: 

61 """Load message part.""" 

62 if message_id not in self.parts: 

63 raise FileNotFoundError(f"Part {part_id} in message {message_id} not found") 

64 if part_id not in self.parts[message_id]: 

65 raise FileNotFoundError(f"Part {part_id} in message {message_id} not found") 

66 return self.parts[message_id][part_id].copy() 

67 

68 def list_parts(self, message_id: str) -> list[str]: 

69 """List all part IDs for a message in order.""" 

70 if message_id not in self.parts: 

71 return [] 

72 return sorted(self.parts[message_id].keys()) 

73 

74 def save_truncated_output(self, output_id: str, content: str) -> None: 

75 """Save full output that was truncated.""" 

76 self.truncated_outputs[output_id] = (content, time.time()) 

77 

78 def load_truncated_output(self, output_id: str) -> str: 

79 """Load full truncated output.""" 

80 if output_id not in self.truncated_outputs: 

81 raise FileNotFoundError(f"Truncated output {output_id} not found") 

82 return self.truncated_outputs[output_id][0] 

83 

84 def cleanup_old_outputs(self, retention_days: int) -> int: 

85 """Clean up truncated outputs older than retention period.""" 

86 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60) 

87 to_delete = [ 

88 output_id 

89 for output_id, (_, timestamp) in self.truncated_outputs.items() 

90 if timestamp < cutoff_timestamp 

91 ] 

92 

93 for output_id in to_delete: 

94 del self.truncated_outputs[output_id] 

95 

96 return len(to_delete)