Coverage for src / harness_utils / storage / filesystem.py: 21%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-31 13:47 -0600

1"""Filesystem-based storage implementation.""" 

2 

3import json 

4import time 

5from pathlib import Path 

6from typing import Any 

7 

8from harness_utils.config import StorageConfig 

9 

10 

11class FilesystemStorage: 

12 """Hierarchical filesystem storage backend. 

13 

14 Storage structure: 

15 data/ 

16 ├── conversations/{projectID}/{conversationID}.json 

17 ├── messages/{conversationID}/{messageID}.json 

18 ├── parts/{messageID}/{partID}.json 

19 └── truncated-outputs/{outputID} 

20 """ 

21 

22 def __init__(self, config: StorageConfig): 

23 """Initialize filesystem storage. 

24 

25 Args: 

26 config: Storage configuration 

27 """ 

28 self.base_path = Path(config.base_path) 

29 self.retention_days = config.retention_days 

30 

31 self.conversations_dir = self.base_path / "conversations" 

32 self.messages_dir = self.base_path / "messages" 

33 self.parts_dir = self.base_path / "parts" 

34 self.outputs_dir = self.base_path / "truncated-outputs" 

35 

36 self._ensure_directories() 

37 

38 def _ensure_directories(self) -> None: 

39 """Create storage directories if they don't exist.""" 

40 self.conversations_dir.mkdir(parents=True, exist_ok=True) 

41 self.messages_dir.mkdir(parents=True, exist_ok=True) 

42 self.parts_dir.mkdir(parents=True, exist_ok=True) 

43 self.outputs_dir.mkdir(parents=True, exist_ok=True) 

44 

45 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None: 

46 """Save conversation metadata.""" 

47 project_id = data.get("project_id", "default") 

48 project_dir = self.conversations_dir / project_id 

49 project_dir.mkdir(parents=True, exist_ok=True) 

50 

51 conv_file = project_dir / f"{conversation_id}.json" 

52 with open(conv_file, "w") as f: 

53 json.dump(data, f, indent=2) 

54 

55 def load_conversation(self, conversation_id: str) -> dict[str, Any]: 

56 """Load conversation metadata.""" 

57 for project_dir in self.conversations_dir.iterdir(): 

58 if not project_dir.is_dir(): 

59 continue 

60 conv_file = project_dir / f"{conversation_id}.json" 

61 if conv_file.exists(): 

62 with open(conv_file, "r") as f: 

63 return json.load(f) 

64 

65 raise FileNotFoundError(f"Conversation {conversation_id} not found") 

66 

67 def save_message( 

68 self, 

69 conversation_id: str, 

70 message_id: str, 

71 data: dict[str, Any] 

72 ) -> None: 

73 """Save message metadata.""" 

74 msg_dir = self.messages_dir / conversation_id 

75 msg_dir.mkdir(parents=True, exist_ok=True) 

76 

77 msg_file = msg_dir / f"{message_id}.json" 

78 with open(msg_file, "w") as f: 

79 json.dump(data, f, indent=2) 

80 

81 def load_message( 

82 self, 

83 conversation_id: str, 

84 message_id: str 

85 ) -> dict[str, Any]: 

86 """Load message metadata.""" 

87 msg_file = self.messages_dir / conversation_id / f"{message_id}.json" 

88 if not msg_file.exists(): 

89 raise FileNotFoundError( 

90 f"Message {message_id} in conversation {conversation_id} not found" 

91 ) 

92 

93 with open(msg_file, "r") as f: 

94 return json.load(f) 

95 

96 def list_messages(self, conversation_id: str) -> list[str]: 

97 """List all message IDs for a conversation in chronological order.""" 

98 msg_dir = self.messages_dir / conversation_id 

99 if not msg_dir.exists(): 

100 return [] 

101 

102 message_ids = [ 

103 f.stem for f in msg_dir.glob("*.json") 

104 ] 

105 return sorted(message_ids) 

106 

107 def save_part( 

108 self, 

109 message_id: str, 

110 part_id: str, 

111 data: dict[str, Any] 

112 ) -> None: 

113 """Save message part.""" 

114 part_dir = self.parts_dir / message_id 

115 part_dir.mkdir(parents=True, exist_ok=True) 

116 

117 part_file = part_dir / f"{part_id}.json" 

118 with open(part_file, "w") as f: 

119 json.dump(data, f, indent=2) 

120 

121 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]: 

122 """Load message part.""" 

123 part_file = self.parts_dir / message_id / f"{part_id}.json" 

124 if not part_file.exists(): 

125 raise FileNotFoundError( 

126 f"Part {part_id} in message {message_id} not found" 

127 ) 

128 

129 with open(part_file, "r") as f: 

130 return json.load(f) 

131 

132 def list_parts(self, message_id: str) -> list[str]: 

133 """List all part IDs for a message in order.""" 

134 part_dir = self.parts_dir / message_id 

135 if not part_dir.exists(): 

136 return [] 

137 

138 part_ids = [ 

139 f.stem for f in part_dir.glob("*.json") 

140 ] 

141 return sorted(part_ids) 

142 

143 def save_truncated_output(self, output_id: str, content: str) -> None: 

144 """Save full output that was truncated.""" 

145 output_file = self.outputs_dir / output_id 

146 with open(output_file, "w") as f: 

147 f.write(content) 

148 

149 def load_truncated_output(self, output_id: str) -> str: 

150 """Load full truncated output.""" 

151 output_file = self.outputs_dir / output_id 

152 if not output_file.exists(): 

153 raise FileNotFoundError(f"Truncated output {output_id} not found") 

154 

155 with open(output_file, "r") as f: 

156 return f.read() 

157 

158 def cleanup_old_outputs(self, retention_days: int) -> int: 

159 """Clean up truncated outputs older than retention period.""" 

160 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60) 

161 deleted_count = 0 

162 

163 for output_file in self.outputs_dir.iterdir(): 

164 if output_file.is_file(): 

165 if output_file.stat().st_mtime < cutoff_timestamp: 

166 output_file.unlink() 

167 deleted_count += 1 

168 

169 return deleted_count