Coverage for src / harnessutils / storage / filesystem.py: 99%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-12 22:41 -0600

1"""Filesystem-based storage implementation.""" 

2 

3import json 

4import time 

5from pathlib import Path 

6from typing import Any 

7 

8from harnessutils.config import StorageConfig 

9 

10 

11class FilesystemStorage: 

12 """Hierarchical filesystem storage backend. 

13 

14 Storage structure: 

15 data/ 

16 ├── conversations/{projectID}/{conversationID}.json 

17 ├── messages/{conversationID}/{messageID}.json 

18 ├── parts/{messageID}/{partID}.json 

19 └── truncated-outputs/{outputID} 

20 """ 

21 

22 def __init__(self, config: StorageConfig): 

23 """Initialize filesystem storage. 

24 

25 Args: 

26 config: Storage configuration 

27 """ 

28 self.base_path = Path(config.base_path) 

29 self.retention_days = config.retention_days 

30 

31 self.conversations_dir = self.base_path / "conversations" 

32 self.messages_dir = self.base_path / "messages" 

33 self.parts_dir = self.base_path / "parts" 

34 self.outputs_dir = self.base_path / "truncated-outputs" 

35 

36 self._ensure_directories() 

37 

38 def _ensure_directories(self) -> None: 

39 """Create storage directories if they don't exist.""" 

40 self.conversations_dir.mkdir(parents=True, exist_ok=True) 

41 self.messages_dir.mkdir(parents=True, exist_ok=True) 

42 self.parts_dir.mkdir(parents=True, exist_ok=True) 

43 self.outputs_dir.mkdir(parents=True, exist_ok=True) 

44 

45 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None: 

46 """Save conversation metadata.""" 

47 project_id = data.get("project_id", "default") 

48 project_dir = self.conversations_dir / project_id 

49 project_dir.mkdir(parents=True, exist_ok=True) 

50 

51 conv_file = project_dir / f"{conversation_id}.json" 

52 with open(conv_file, "w") as f: 

53 json.dump(data, f, indent=2) 

54 

55 def load_conversation(self, conversation_id: str) -> dict[str, Any]: 

56 """Load conversation metadata.""" 

57 for project_dir in self.conversations_dir.iterdir(): 

58 if not project_dir.is_dir(): 

59 continue 

60 conv_file = project_dir / f"{conversation_id}.json" 

61 if conv_file.exists(): 

62 with open(conv_file) as f: 

63 data: dict[str, Any] = json.load(f) 

64 return data 

65 

66 raise FileNotFoundError(f"Conversation {conversation_id} not found") 

67 

68 def save_message(self, conversation_id: str, message_id: str, data: dict[str, Any]) -> None: 

69 """Save message metadata.""" 

70 msg_dir = self.messages_dir / conversation_id 

71 msg_dir.mkdir(parents=True, exist_ok=True) 

72 

73 msg_file = msg_dir / f"{message_id}.json" 

74 with open(msg_file, "w") as f: 

75 json.dump(data, f, indent=2) 

76 

77 def load_message(self, conversation_id: str, message_id: str) -> dict[str, Any]: 

78 """Load message metadata.""" 

79 msg_file = self.messages_dir / conversation_id / f"{message_id}.json" 

80 if not msg_file.exists(): 

81 raise FileNotFoundError( 

82 f"Message {message_id} in conversation {conversation_id} not found" 

83 ) 

84 

85 with open(msg_file) as f: 

86 data: dict[str, Any] = json.load(f) 

87 return data 

88 

89 def list_messages(self, conversation_id: str) -> list[str]: 

90 """List all message IDs for a conversation in chronological order.""" 

91 msg_dir = self.messages_dir / conversation_id 

92 if not msg_dir.exists(): 

93 return [] 

94 

95 message_ids = [f.stem for f in msg_dir.glob("*.json")] 

96 return sorted(message_ids) 

97 

98 def save_part(self, message_id: str, part_id: str, data: dict[str, Any]) -> None: 

99 """Save message part.""" 

100 part_dir = self.parts_dir / message_id 

101 part_dir.mkdir(parents=True, exist_ok=True) 

102 

103 part_file = part_dir / f"{part_id}.json" 

104 with open(part_file, "w") as f: 

105 json.dump(data, f, indent=2) 

106 

107 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]: 

108 """Load message part.""" 

109 part_file = self.parts_dir / message_id / f"{part_id}.json" 

110 if not part_file.exists(): 

111 raise FileNotFoundError(f"Part {part_id} in message {message_id} not found") 

112 

113 with open(part_file) as f: 

114 data: dict[str, Any] = json.load(f) 

115 return data 

116 

117 def list_parts(self, message_id: str) -> list[str]: 

118 """List all part IDs for a message in order.""" 

119 part_dir = self.parts_dir / message_id 

120 if not part_dir.exists(): 

121 return [] 

122 

123 part_ids = [f.stem for f in part_dir.glob("*.json")] 

124 return sorted(part_ids) 

125 

126 def save_truncated_output(self, output_id: str, content: str) -> None: 

127 """Save full output that was truncated.""" 

128 output_file = self.outputs_dir / output_id 

129 with open(output_file, "w") as f: 

130 f.write(content) 

131 

132 def load_truncated_output(self, output_id: str) -> str: 

133 """Load full truncated output.""" 

134 output_file = self.outputs_dir / output_id 

135 if not output_file.exists(): 

136 raise FileNotFoundError(f"Truncated output {output_id} not found") 

137 

138 with open(output_file) as f: 

139 return f.read() 

140 

141 def cleanup_old_outputs(self, retention_days: int) -> int: 

142 """Clean up truncated outputs older than retention period.""" 

143 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60) 

144 deleted_count = 0 

145 

146 for output_file in self.outputs_dir.iterdir(): 

147 if output_file.is_file(): 

148 if output_file.stat().st_mtime < cutoff_timestamp: 

149 output_file.unlink() 

150 deleted_count += 1 

151 

152 return deleted_count