Coverage for src / harnessutils / compaction / truncation.py: 82%

225 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 09:07 -0600

1"""Tier 1: Output truncation at tool execution boundary. 

2 

3Prevents large outputs from entering context by truncating at source. 

4Cost: Free, Latency: 0ms. 

5""" 

6 

7import json 

8import re 

9from dataclasses import dataclass 

10from typing import Any, Literal 

11 

12from harnessutils.config import TruncationConfig 

13 

14ContentType = Literal["json", "logs", "stacktrace", "code", "text"] 

15 

16 

17@dataclass 

18class TruncationResult: 

19 """Result of truncation operation.""" 

20 

21 content: str 

22 truncated: bool 

23 output_path: str | None = None 

24 bytes_removed: int = 0 

25 content_type: ContentType = "text" 

26 

27 

28def detect_content_type(output: str) -> ContentType: 

29 """Detect the content type of output. 

30 

31 Args: 

32 output: The output to analyze 

33 

34 Returns: 

35 Detected content type 

36 """ 

37 stripped = output.strip() 

38 

39 if not stripped: 

40 return "text" 

41 

42 # JSON: starts with { or [, valid JSON parse 

43 if stripped[0] in ("{", "["): 

44 try: 

45 json.loads(stripped) 

46 return "json" 

47 except (json.JSONDecodeError, ValueError): 

48 pass 

49 

50 # Stacktrace: Python/Java/JS error patterns 

51 stacktrace_patterns = [ 

52 r"Traceback \(most recent call last\)", 

53 r"^\s*at\s+.*\(.*:\d+:\d+\)", # JS stacktrace 

54 r'^\s*File ".*", line \d+', # Python stacktrace 

55 r"Exception in thread", # Java stacktrace 

56 ] 

57 

58 for pattern in stacktrace_patterns: 

59 if re.search(pattern, output, re.MULTILINE): 

60 return "stacktrace" 

61 

62 # Logs: timestamps, log levels 

63 log_patterns = [ 

64 r"\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}", # ISO timestamp 

65 r"\[(?:ERROR|WARN|INFO|DEBUG)\]", # Log levels 

66 r"(?:ERROR|WARN|INFO|DEBUG):", # Alternative format 

67 ] 

68 

69 for pattern in log_patterns: 

70 if re.search(pattern, output): 

71 return "logs" 

72 

73 # Code: Python/JS/Java keywords 

74 code_patterns = [ 

75 r"^\s*(?:def|class|import|from)\s+", # Python 

76 r"^\s*(?:function|const|let|var)\s+", # JavaScript 

77 r"^\s*(?:public|private|protected)\s+", # Java/C++ 

78 ] 

79 

80 for pattern in code_patterns: 

81 if re.search(pattern, output, re.MULTILINE): 

82 return "code" 

83 

84 return "text" 

85 

86 

87def _truncate_json(output: str, max_tokens: int, json_array_limit: int = 10) -> tuple[str, int]: 

88 """Truncate JSON while preserving structure. 

89 

90 Args: 

91 output: JSON string to truncate 

92 max_tokens: Maximum tokens to keep 

93 json_array_limit: Maximum items to keep in JSON arrays 

94 

95 Returns: 

96 Tuple of (truncated_json, tokens_removed) 

97 """ 

98 from harnessutils.tokens.exact import count_tokens_fast 

99 

100 try: 

101 data = json.loads(output) 

102 

103 # If it's already under limit, return as-is 

104 current_tokens = count_tokens_fast(output) 

105 if current_tokens <= max_tokens: 

106 return output, 0 

107 

108 head = json_array_limit // 2 

109 tail = json_array_limit - head 

110 

111 # For arrays, keep first and last N items 

112 if isinstance(data, list): 

113 total_items = len(data) 

114 if total_items > json_array_limit: 

115 truncated_list = data[:head] + data[-tail:] 

116 result = json.dumps(truncated_list, indent=2) 

117 result += f"\n\n... {total_items - json_array_limit} items truncated ..." 

118 tokens_removed = current_tokens - count_tokens_fast(result) 

119 return result, tokens_removed 

120 

121 # For objects, truncate large string values 

122 if isinstance(data, dict): 

123 truncated_dict: dict[str, Any] = {} 

124 for key, value in data.items(): 

125 if isinstance(value, str) and len(value) > 500: 

126 truncated_dict[key] = value[:500] + "... (truncated)" 

127 elif isinstance(value, list) and len(value) > json_array_limit: 

128 truncated_dict[key] = value[:head] + ["... (truncated)"] + value[-tail:] 

129 else: 

130 truncated_dict[key] = value 

131 

132 result = json.dumps(truncated_dict, indent=2) 

133 tokens_removed = current_tokens - count_tokens_fast(result) 

134 return result, tokens_removed 

135 

136 return output, 0 

137 

138 except (json.JSONDecodeError, ValueError): 

139 # If JSON parsing fails, fallback to text truncation 

140 return _truncate_text(output, max_tokens) 

141 

142 

143def _truncate_stacktrace(output: str, max_tokens: int, config: TruncationConfig) -> tuple[str, int]: 

144 """Truncate stacktrace while preserving error message and key frames. 

145 

146 Args: 

147 output: Stacktrace to truncate 

148 max_tokens: Maximum tokens to keep 

149 config: Truncation configuration 

150 

151 Returns: 

152 Tuple of (truncated_stacktrace, tokens_removed) 

153 """ 

154 from harnessutils.tokens.exact import count_tokens_fast 

155 

156 current_tokens = count_tokens_fast(output) 

157 if current_tokens <= max_tokens: 

158 return output, 0 

159 

160 lines = output.split("\n") 

161 frame_limit = config.stacktrace_frame_limit // 2 # Split between top and bottom 

162 

163 # Find error message (usually at the end for Python, beginning for JS) 

164 error_line = None 

165 for i in range(len(lines) - 1, max(0, len(lines) - 5), -1): 

166 if lines[i].strip() and not lines[i].startswith(" "): 

167 error_line = i 

168 break 

169 

170 # Keep error message 

171 result_lines = [] 

172 if error_line is not None: 

173 result_lines.append(lines[error_line]) 

174 result_lines.append("") 

175 

176 # Keep top N frames 

177 result_lines.extend(lines[:frame_limit]) 

178 result_lines.append("") 

179 result_lines.append(f"... {len(lines) - 2 * frame_limit} frames truncated ...") 

180 result_lines.append("") 

181 

182 # Keep bottom N frames (excluding error message) 

183 if error_line is not None: 

184 result_lines.extend(lines[max(frame_limit, error_line - frame_limit):error_line]) 

185 else: 

186 result_lines.extend(lines[-frame_limit:]) 

187 

188 result = "\n".join(result_lines) 

189 tokens_removed = current_tokens - count_tokens_fast(result) 

190 return result, tokens_removed 

191 

192 

193def _truncate_logs(output: str, max_tokens: int, config: TruncationConfig) -> tuple[str, int]: 

194 """Truncate logs while preserving all errors and warnings. 

195 

196 Args: 

197 output: Logs to truncate 

198 max_tokens: Maximum tokens to keep 

199 config: Truncation configuration 

200 

201 Returns: 

202 Tuple of (truncated_logs, tokens_removed) 

203 """ 

204 from harnessutils.tokens.exact import count_tokens_fast 

205 

206 current_tokens = count_tokens_fast(output) 

207 if current_tokens <= max_tokens: 

208 return output, 0 

209 

210 lines = output.split("\n") 

211 

212 if not config.preserve_errors: 

213 # Just do head/tail truncation 

214 return _truncate_text(output, max_tokens) 

215 

216 # Categorize lines 

217 error_lines = [] 

218 warning_lines = [] 

219 info_lines = [] 

220 

221 error_pattern = re.compile(r"\[?ERROR\]?|ERROR:", re.IGNORECASE) 

222 warning_pattern = re.compile(r"\[?WARN(?:ING)?\]?|WARN(?:ING)?:", re.IGNORECASE) 

223 

224 for i, line in enumerate(lines): 

225 if error_pattern.search(line): 

226 error_lines.append((i, line)) 

227 elif warning_pattern.search(line): 

228 warning_lines.append((i, line)) 

229 else: 

230 info_lines.append((i, line)) 

231 

232 # Always keep all errors and warnings 

233 result_lines = [] 

234 

235 if error_lines: 

236 result_lines.append("=== ERRORS ===") 

237 result_lines.extend([line for _, line in error_lines]) 

238 result_lines.append("") 

239 

240 if warning_lines: 

241 result_lines.append("=== WARNINGS ===") 

242 result_lines.extend([line for _, line in warning_lines]) 

243 result_lines.append("") 

244 

245 # Sample info lines (first and last 50) 

246 if info_lines: 

247 result_lines.append("=== INFO (sampled) ===") 

248 if len(info_lines) > 100: 

249 result_lines.extend([line for _, line in info_lines[:50]]) 

250 result_lines.append(f"... {len(info_lines) - 100} info lines truncated ...") 

251 result_lines.extend([line for _, line in info_lines[-50:]]) 

252 else: 

253 result_lines.extend([line for _, line in info_lines]) 

254 

255 result = "\n".join(result_lines) 

256 tokens_removed = current_tokens - count_tokens_fast(result) 

257 return result, tokens_removed 

258 

259 

260def _truncate_code(output: str, max_tokens: int) -> tuple[str, int]: 

261 """Truncate code while preserving function signatures. 

262 

263 Args: 

264 output: Code to truncate 

265 max_tokens: Maximum tokens to keep 

266 

267 Returns: 

268 Tuple of (truncated_code, tokens_removed) 

269 """ 

270 from harnessutils.tokens.exact import count_tokens_fast 

271 

272 current_tokens = count_tokens_fast(output) 

273 if current_tokens <= max_tokens: 

274 return output, 0 

275 

276 lines = output.split("\n") 

277 result_lines = [] 

278 

279 # Patterns for function/class signatures 

280 signature_patterns = [ 

281 re.compile(r"^\s*(?:def|class)\s+\w+"), # Python 

282 re.compile(r"^\s*(?:function|const|let)\s+"), # JavaScript 

283 re.compile(r"^\s*(?:public|private|protected)\s+.*\("), # Java/C++ 

284 ] 

285 

286 for line in lines: 

287 is_signature = any(pattern.search(line) for pattern in signature_patterns) 

288 

289 if is_signature: 

290 result_lines.append(line) 

291 # Check if tokens exceeded 

292 if count_tokens_fast("\n".join(result_lines)) > max_tokens: 

293 result_lines.pop() 

294 result_lines.append("... (code truncated)") 

295 break 

296 elif len(result_lines) < 20: # Keep first 20 lines regardless 

297 result_lines.append(line) 

298 

299 result = "\n".join(result_lines) 

300 tokens_removed = current_tokens - count_tokens_fast(result) 

301 return result, tokens_removed 

302 

303 

304def _truncate_text(output: str, max_tokens: int) -> tuple[str, int]: 

305 """Truncate text at token boundaries (head + tail). 

306 

307 Args: 

308 output: Text to truncate 

309 max_tokens: Maximum tokens to keep 

310 

311 Returns: 

312 Tuple of (truncated_text, tokens_removed) 

313 """ 

314 from harnessutils.tokens.exact import count_tokens_fast 

315 

316 current_tokens = count_tokens_fast(output) 

317 if current_tokens <= max_tokens: 

318 return output, 0 

319 

320 # Split tokens 70/30 between head and tail 

321 head_tokens = int(max_tokens * 0.7) 

322 tail_tokens = max_tokens - head_tokens 

323 

324 # Approximate character counts (4 chars per token) 

325 head_chars = head_tokens * 4 

326 tail_chars = tail_tokens * 4 

327 

328 head = output[:head_chars] 

329 tail = output[-tail_chars:] 

330 

331 result = f"{head}\n\n... (truncated) ...\n\n{tail}" 

332 tokens_removed = current_tokens - count_tokens_fast(result) 

333 return result, tokens_removed 

334 

335 

336def truncate_output( 

337 output: str, 

338 config: TruncationConfig, 

339 output_id: str | None = None, 

340 content_type: ContentType | None = None, 

341) -> TruncationResult: 

342 """Truncate tool output if it exceeds limits. 

343 

344 Args: 

345 output: The tool output to potentially truncate 

346 config: Truncation configuration 

347 output_id: ID for saving full output (if None, full output not saved) 

348 content_type: Override detected content type 

349 

350 Returns: 

351 TruncationResult with content and metadata 

352 """ 

353 from harnessutils.tokens.exact import count_tokens_fast 

354 

355 # Detect content type if not provided 

356 detected_type = content_type or detect_content_type(output) 

357 

358 # Use content-aware truncation if enabled AND max_tokens would trigger 

359 # (backward compatibility: if only max_lines/max_bytes set, use legacy mode) 

360 current_tokens = count_tokens_fast(output) 

361 if config.use_content_aware and current_tokens > config.max_tokens: 

362 

363 # Route to specialized truncator 

364 try: 

365 if detected_type == "json": 

366 truncated, tokens_removed = _truncate_json( 

367 output, config.max_tokens, config.json_array_limit 

368 ) 

369 elif detected_type == "stacktrace": 

370 truncated, tokens_removed = _truncate_stacktrace(output, config.max_tokens, config) 

371 elif detected_type == "logs": 

372 truncated, tokens_removed = _truncate_logs(output, config.max_tokens, config) 

373 elif detected_type == "code": 

374 truncated, tokens_removed = _truncate_code(output, config.max_tokens) 

375 else: # text 

376 truncated, tokens_removed = _truncate_text(output, config.max_tokens) 

377 

378 return TruncationResult( 

379 content=truncated, 

380 truncated=True, 

381 output_path=output_id, 

382 bytes_removed=tokens_removed * 4, # Approximate 

383 content_type=detected_type, 

384 ) 

385 except Exception: 

386 # Fallback to text truncation on any error 

387 truncated, tokens_removed = _truncate_text(output, config.max_tokens) 

388 return TruncationResult( 

389 content=truncated, 

390 truncated=True, 

391 output_path=output_id, 

392 bytes_removed=tokens_removed * 4, 

393 content_type="text", 

394 ) 

395 

396 # Legacy line/byte-based truncation (backward compatibility) 

397 lines = output.split("\n") 

398 total_bytes = len(output.encode("utf-8")) 

399 

400 if len(lines) <= config.max_lines and total_bytes <= config.max_bytes: 

401 return TruncationResult( 

402 content=output, 

403 truncated=False, 

404 content_type=detected_type, 

405 ) 

406 

407 preview_lines: list[str] = [] 

408 bytes_accumulated = 0 

409 

410 if config.direction == "head": 

411 for i, line in enumerate(lines): 

412 if i >= config.max_lines: 

413 break 

414 line_bytes = len(line.encode("utf-8")) + 1 # +1 for newline 

415 if bytes_accumulated + line_bytes > config.max_bytes: 

416 break 

417 preview_lines.append(line) 

418 bytes_accumulated += line_bytes 

419 else: # tail 

420 for i in range(len(lines) - 1, -1, -1): 

421 if len(preview_lines) >= config.max_lines: 

422 break 

423 line = lines[i] 

424 line_bytes = len(line.encode("utf-8")) + 1 # +1 for newline 

425 if bytes_accumulated + line_bytes > config.max_bytes: 

426 break 

427 preview_lines.insert(0, line) 

428 bytes_accumulated += line_bytes 

429 

430 preview = "\n".join(preview_lines) 

431 bytes_removed = total_bytes - bytes_accumulated 

432 

433 direction: Literal["head", "tail"] = "head" if config.direction == "head" else "tail" 

434 message = _format_truncated_message( 

435 preview, 

436 bytes_removed, 

437 output_id, 

438 direction, 

439 ) 

440 

441 return TruncationResult( 

442 content=message, 

443 truncated=True, 

444 output_path=output_id, 

445 bytes_removed=bytes_removed, 

446 content_type=detected_type, 

447 ) 

448 

449 

450def _format_truncated_message( 

451 preview: str, 

452 bytes_removed: int, 

453 output_path: str | None, 

454 direction: Literal["head", "tail"], 

455) -> str: 

456 """Format the truncated output message. 

457 

458 Args: 

459 preview: Preview content (head or tail) 

460 bytes_removed: Number of bytes that were removed 

461 output_path: Path where full output was saved 

462 direction: Direction of truncation 

463 

464 Returns: 

465 Formatted message string 

466 """ 

467 parts = [preview] 

468 

469 if bytes_removed > 0: 

470 parts.append("") 

471 parts.append(f"...{bytes_removed} bytes truncated...") 

472 parts.append("") 

473 

474 if output_path: 

475 parts.append(f"Full output saved to: {output_path}") 

476 parts.append("Use search tools to query the full content or read specific sections.") 

477 parts.append("Delegate large file processing to specialized exploration agents.") 

478 

479 return "\n".join(parts)