Coverage for src / harnessutils / storage / filesystem.py: 99%
93 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 22:41 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 22:41 -0600
1"""Filesystem-based storage implementation."""
3import json
4import time
5from pathlib import Path
6from typing import Any
8from harnessutils.config import StorageConfig
11class FilesystemStorage:
12 """Hierarchical filesystem storage backend.
14 Storage structure:
15 data/
16 ├── conversations/{projectID}/{conversationID}.json
17 ├── messages/{conversationID}/{messageID}.json
18 ├── parts/{messageID}/{partID}.json
19 └── truncated-outputs/{outputID}
20 """
22 def __init__(self, config: StorageConfig):
23 """Initialize filesystem storage.
25 Args:
26 config: Storage configuration
27 """
28 self.base_path = Path(config.base_path)
29 self.retention_days = config.retention_days
31 self.conversations_dir = self.base_path / "conversations"
32 self.messages_dir = self.base_path / "messages"
33 self.parts_dir = self.base_path / "parts"
34 self.outputs_dir = self.base_path / "truncated-outputs"
36 self._ensure_directories()
38 def _ensure_directories(self) -> None:
39 """Create storage directories if they don't exist."""
40 self.conversations_dir.mkdir(parents=True, exist_ok=True)
41 self.messages_dir.mkdir(parents=True, exist_ok=True)
42 self.parts_dir.mkdir(parents=True, exist_ok=True)
43 self.outputs_dir.mkdir(parents=True, exist_ok=True)
45 def save_conversation(self, conversation_id: str, data: dict[str, Any]) -> None:
46 """Save conversation metadata."""
47 project_id = data.get("project_id", "default")
48 project_dir = self.conversations_dir / project_id
49 project_dir.mkdir(parents=True, exist_ok=True)
51 conv_file = project_dir / f"{conversation_id}.json"
52 with open(conv_file, "w") as f:
53 json.dump(data, f, indent=2)
55 def load_conversation(self, conversation_id: str) -> dict[str, Any]:
56 """Load conversation metadata."""
57 for project_dir in self.conversations_dir.iterdir():
58 if not project_dir.is_dir():
59 continue
60 conv_file = project_dir / f"{conversation_id}.json"
61 if conv_file.exists():
62 with open(conv_file) as f:
63 data: dict[str, Any] = json.load(f)
64 return data
66 raise FileNotFoundError(f"Conversation {conversation_id} not found")
68 def save_message(self, conversation_id: str, message_id: str, data: dict[str, Any]) -> None:
69 """Save message metadata."""
70 msg_dir = self.messages_dir / conversation_id
71 msg_dir.mkdir(parents=True, exist_ok=True)
73 msg_file = msg_dir / f"{message_id}.json"
74 with open(msg_file, "w") as f:
75 json.dump(data, f, indent=2)
77 def load_message(self, conversation_id: str, message_id: str) -> dict[str, Any]:
78 """Load message metadata."""
79 msg_file = self.messages_dir / conversation_id / f"{message_id}.json"
80 if not msg_file.exists():
81 raise FileNotFoundError(
82 f"Message {message_id} in conversation {conversation_id} not found"
83 )
85 with open(msg_file) as f:
86 data: dict[str, Any] = json.load(f)
87 return data
89 def list_messages(self, conversation_id: str) -> list[str]:
90 """List all message IDs for a conversation in chronological order."""
91 msg_dir = self.messages_dir / conversation_id
92 if not msg_dir.exists():
93 return []
95 message_ids = [f.stem for f in msg_dir.glob("*.json")]
96 return sorted(message_ids)
98 def save_part(self, message_id: str, part_id: str, data: dict[str, Any]) -> None:
99 """Save message part."""
100 part_dir = self.parts_dir / message_id
101 part_dir.mkdir(parents=True, exist_ok=True)
103 part_file = part_dir / f"{part_id}.json"
104 with open(part_file, "w") as f:
105 json.dump(data, f, indent=2)
107 def load_part(self, message_id: str, part_id: str) -> dict[str, Any]:
108 """Load message part."""
109 part_file = self.parts_dir / message_id / f"{part_id}.json"
110 if not part_file.exists():
111 raise FileNotFoundError(f"Part {part_id} in message {message_id} not found")
113 with open(part_file) as f:
114 data: dict[str, Any] = json.load(f)
115 return data
117 def list_parts(self, message_id: str) -> list[str]:
118 """List all part IDs for a message in order."""
119 part_dir = self.parts_dir / message_id
120 if not part_dir.exists():
121 return []
123 part_ids = [f.stem for f in part_dir.glob("*.json")]
124 return sorted(part_ids)
126 def save_truncated_output(self, output_id: str, content: str) -> None:
127 """Save full output that was truncated."""
128 output_file = self.outputs_dir / output_id
129 with open(output_file, "w") as f:
130 f.write(content)
132 def load_truncated_output(self, output_id: str) -> str:
133 """Load full truncated output."""
134 output_file = self.outputs_dir / output_id
135 if not output_file.exists():
136 raise FileNotFoundError(f"Truncated output {output_id} not found")
138 with open(output_file) as f:
139 return f.read()
141 def cleanup_old_outputs(self, retention_days: int) -> int:
142 """Clean up truncated outputs older than retention period."""
143 cutoff_timestamp = time.time() - (retention_days * 24 * 60 * 60)
144 deleted_count = 0
146 for output_file in self.outputs_dir.iterdir():
147 if output_file.is_file():
148 if output_file.stat().st_mtime < cutoff_timestamp:
149 output_file.unlink()
150 deleted_count += 1
152 return deleted_count