Coverage for src / harnessutils / storage / filesystem.py: 87%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 08:30 -0600

1"""Filesystem-based storage implementation.""" 

2 

3import json 

4import time 

5from pathlib import Path 

6from typing import Any 

7 

8from harnessutils.config import StorageConfig 

9 

10 

11class FilesystemStorage: 

12 """Hierarchical filesystem storage backend. 

13 

14 Storage structure: 

15 data/ 

16 ├── conversations/{projectID}/{conversationID}.json 

17 ├── messages/{conversationID}/{messageID}.json 

18 ├── parts/{messageID}/{partID}.json 

19 └── truncated-outputs/{outputID} 

20 """ 

21 

22 def __init__(self, config: StorageConfig): 

23 """Initialize filesystem storage. 

24 

25 Args: 

26 config: Storage configuration 

27 """ 

28 self.base_path = Path(config.base_path) 

29 self.retention_days = config.retention_days 

30 

31 self.conversations_dir = self.base_path / "conversations" 

32 self.messages_dir = self.base_path / "messages" 

33 self.parts_dir = self.base_path / "parts" 

34 self.outputs_dir = self.base_path / "truncated-outputs" 

35 self.memory_dir = self.base_path / "memory" 

36 

37 self._ensure_directories() 

38 

39 def _ensure_directories(self) -> None: 

40 """Create storage directories if they don't exist.""" 

41 self.conversations_dir.mkdir(parents=True, exist_ok=True) 

42 self.messages_dir.mkdir(parents=True, exist_ok=True) 

43 self.parts_dir.mkdir(parents=True, exist_ok=True) 

44 self.outputs_dir.mkdir(parents=True, exist_ok=True) 

45 self.memory_dir.mkdir(parents=True, exist_ok=True) 

46 

47 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None: 

48 """Save conversation metadata.""" 

49 project_id = data.get("project_id", "default") 

50 project_dir = self.conversations_dir / project_id 

51 project_dir.mkdir(parents=True, exist_ok=True) 

52 

53 conv_file = project_dir / f"{conversation_id}.json" 

54 with open(conv_file, "w") as f: 

55 json.dump(data, f, indent=2) 

56 

57 def load_conversation(self, conversation_id: str) -> dict[str, Any]: 

58 """Load conversation metadata.""" 

59 for project_dir in self.conversations_dir.iterdir(): 

60 if not project_dir.is_dir(): 

61 continue 

62 conv_file = project_dir / f"{conversation_id}.json" 

63 if conv_file.exists(): 

64 with open(conv_file) as f: 

65 data: dict[str, Any] = json.load(f) 

66 return data 

67 

68 raise FileNotFoundError(f"Conversation {conversation_id} not found") 

69 

70 def save_message(self, conversation_id: str, message_id: str, data: dict[str, Any]) -> None: 

71 """Save message metadata.""" 

72 msg_dir = self.messages_dir / conversation_id 

73 msg_dir.mkdir(parents=True, exist_ok=True) 

74 

75 msg_file = msg_dir / f"{message_id}.json" 

76 with open(msg_file, "w") as f: 

77 json.dump(data, f, indent=2) 

78 

79 def load_message(self, conversation_id: str, message_id: str) -> dict[str, Any]: 

80 """Load message metadata.""" 

81 msg_file = self.messages_dir / conversation_id / f"{message_id}.json" 

82 if not msg_file.exists(): 

83 raise FileNotFoundError( 

84 f"Message {message_id} in conversation {conversation_id} not found" 

85 ) 

86 

87 with open(msg_file) as f: 

88 data: dict[str, Any] = json.load(f) 

89 return data 

90 

91 def list_messages(self, conversation_id: str) -> list[str]: 

92 """List all message IDs for a conversation in chronological order.""" 

93 msg_dir = self.messages_dir / conversation_id 

94 if not msg_dir.exists(): 

95 return [] 

96 

97 message_ids = [f.stem for f in msg_dir.glob("*.json")] 

98 return sorted(message_ids) 

99 

100 def save_part(self, message_id: str, part_id: str, data: dict[str, Any]) -> None: 

101 """Save message part.""" 

102 part_dir = self.parts_dir / message_id 

103 part_dir.mkdir(parents=True, exist_ok=True) 

104 

105 part_file = part_dir / f"{part_id}.json" 

106 with open(part_file, "w") as f: 

107 json.dump(data, f, indent=2) 

108 

109 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]: 

110 """Load message part.""" 

111 part_file = self.parts_dir / message_id / f"{part_id}.json" 

112 if not part_file.exists(): 

113 raise FileNotFoundError(f"Part {part_id} in message {message_id} not found") 

114 

115 with open(part_file) as f: 

116 data: dict[str, Any] = json.load(f) 

117 return data 

118 

119 def list_parts(self, message_id: str) -> list[str]: 

120 """List all part IDs for a message in order.""" 

121 part_dir = self.parts_dir / message_id 

122 if not part_dir.exists(): 

123 return [] 

124 

125 part_ids = [f.stem for f in part_dir.glob("*.json")] 

126 return sorted(part_ids) 

127 

128 def save_truncated_output(self, output_id: str, content: str) -> None: 

129 """Save full output that was truncated.""" 

130 output_file = self.outputs_dir / output_id 

131 with open(output_file, "w") as f: 

132 f.write(content) 

133 

134 def load_truncated_output(self, output_id: str) -> str: 

135 """Load full truncated output.""" 

136 output_file = self.outputs_dir / output_id 

137 if not output_file.exists(): 

138 raise FileNotFoundError(f"Truncated output {output_id} not found") 

139 

140 with open(output_file) as f: 

141 return f.read() 

142 

143 def save_project_memory(self, project_id: str, data: dict[str, Any]) -> None: 

144 """Save project-scoped memory.""" 

145 self.memory_dir.mkdir(parents=True, exist_ok=True) 

146 memory_file = self.memory_dir / f"{project_id}.json" 

147 with open(memory_file, "w") as f: 

148 json.dump(data, f, indent=2) 

149 

150 def load_project_memory(self, project_id: str) -> dict[str, Any]: 

151 """Load project-scoped memory. Returns {} if not found.""" 

152 memory_file = self.memory_dir / f"{project_id}.json" 

153 if not memory_file.exists(): 

154 return {} 

155 with open(memory_file) as f: 

156 data: dict[str, Any] = json.load(f) 

157 return data 

158 

159 def delete_project_memory(self, project_id: str) -> None: 

160 """Delete project-scoped memory.""" 

161 memory_file = self.memory_dir / f"{project_id}.json" 

162 if memory_file.exists(): 

163 memory_file.unlink() 

164 

165 def cleanup_old_outputs(self, retention_days: int) -> int: 

166 """Clean up truncated outputs older than retention period.""" 

167 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60) 

168 deleted_count = 0 

169 

170 for output_file in self.outputs_dir.iterdir(): 

171 if output_file.is_file(): 

172 if output_file.stat().st_mtime < cutoff_timestamp: 

173 output_file.unlink() 

174 deleted_count += 1 

175 

176 return deleted_count