Coverage for src / harness_utils / storage / filesystem.py: 21%
90 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-31 13:47 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-31 13:47 -0600
1"""Filesystem-based storage implementation."""
3import json
4import time
5from pathlib import Path
6from typing import Any
8from harness_utils.config import StorageConfig
11class FilesystemStorage:
12 """Hierarchical filesystem storage backend.
14 Storage structure:
15 data/
16 ├── conversations/{projectID}/{conversationID}.json
17 ├── messages/{conversationID}/{messageID}.json
18 ├── parts/{messageID}/{partID}.json
19 └── truncated-outputs/{outputID}
20 """
22 def __init__(self, config: StorageConfig):
23 """Initialize filesystem storage.
25 Args:
26 config: Storage configuration
27 """
28 self.base_path = Path(config.base_path)
29 self.retention_days = config.retention_days
31 self.conversations_dir = self.base_path / "conversations"
32 self.messages_dir = self.base_path / "messages"
33 self.parts_dir = self.base_path / "parts"
34 self.outputs_dir = self.base_path / "truncated-outputs"
36 self._ensure_directories()
38 def _ensure_directories(self) -> None:
39 """Create storage directories if they don't exist."""
40 self.conversations_dir.mkdir(parents=True, exist_ok=True)
41 self.messages_dir.mkdir(parents=True, exist_ok=True)
42 self.parts_dir.mkdir(parents=True, exist_ok=True)
43 self.outputs_dir.mkdir(parents=True, exist_ok=True)
45 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None:
46 """Save conversation metadata."""
47 project_id = data.get("project_id", "default")
48 project_dir = self.conversations_dir / project_id
49 project_dir.mkdir(parents=True, exist_ok=True)
51 conv_file = project_dir / f"{conversation_id}.json"
52 with open(conv_file, "w") as f:
53 json.dump(data, f, indent=2)
55 def load_conversation(self, conversation_id: str) -> dict[str, Any]:
56 """Load conversation metadata."""
57 for project_dir in self.conversations_dir.iterdir():
58 if not project_dir.is_dir():
59 continue
60 conv_file = project_dir / f"{conversation_id}.json"
61 if conv_file.exists():
62 with open(conv_file, "r") as f:
63 return json.load(f)
65 raise FileNotFoundError(f"Conversation {conversation_id} not found")
67 def save_message(
68 self,
69 conversation_id: str,
70 message_id: str,
71 data: dict[str, Any]
72 ) -> None:
73 """Save message metadata."""
74 msg_dir = self.messages_dir / conversation_id
75 msg_dir.mkdir(parents=True, exist_ok=True)
77 msg_file = msg_dir / f"{message_id}.json"
78 with open(msg_file, "w") as f:
79 json.dump(data, f, indent=2)
81 def load_message(
82 self,
83 conversation_id: str,
84 message_id: str
85 ) -> dict[str, Any]:
86 """Load message metadata."""
87 msg_file = self.messages_dir / conversation_id / f"{message_id}.json"
88 if not msg_file.exists():
89 raise FileNotFoundError(
90 f"Message {message_id} in conversation {conversation_id} not found"
91 )
93 with open(msg_file, "r") as f:
94 return json.load(f)
96 def list_messages(self, conversation_id: str) -> list[str]:
97 """List all message IDs for a conversation in chronological order."""
98 msg_dir = self.messages_dir / conversation_id
99 if not msg_dir.exists():
100 return []
102 message_ids = [
103 f.stem for f in msg_dir.glob("*.json")
104 ]
105 return sorted(message_ids)
107 def save_part(
108 self,
109 message_id: str,
110 part_id: str,
111 data: dict[str, Any]
112 ) -> None:
113 """Save message part."""
114 part_dir = self.parts_dir / message_id
115 part_dir.mkdir(parents=True, exist_ok=True)
117 part_file = part_dir / f"{part_id}.json"
118 with open(part_file, "w") as f:
119 json.dump(data, f, indent=2)
121 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]:
122 """Load message part."""
123 part_file = self.parts_dir / message_id / f"{part_id}.json"
124 if not part_file.exists():
125 raise FileNotFoundError(
126 f"Part {part_id} in message {message_id} not found"
127 )
129 with open(part_file, "r") as f:
130 return json.load(f)
132 def list_parts(self, message_id: str) -> list[str]:
133 """List all part IDs for a message in order."""
134 part_dir = self.parts_dir / message_id
135 if not part_dir.exists():
136 return []
138 part_ids = [
139 f.stem for f in part_dir.glob("*.json")
140 ]
141 return sorted(part_ids)
143 def save_truncated_output(self, output_id: str, content: str) -> None:
144 """Save full output that was truncated."""
145 output_file = self.outputs_dir / output_id
146 with open(output_file, "w") as f:
147 f.write(content)
149 def load_truncated_output(self, output_id: str) -> str:
150 """Load full truncated output."""
151 output_file = self.outputs_dir / output_id
152 if not output_file.exists():
153 raise FileNotFoundError(f"Truncated output {output_id} not found")
155 with open(output_file, "r") as f:
156 return f.read()
158 def cleanup_old_outputs(self, retention_days: int) -> int:
159 """Clean up truncated outputs older than retention period."""
160 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60)
161 deleted_count = 0
163 for output_file in self.outputs_dir.iterdir():
164 if output_file.is_file():
165 if output_file.stat().st_mtime < cutoff_timestamp:
166 output_file.unlink()
167 deleted_count += 1
169 return deleted_count