Coverage for src / harnessutils / compaction / truncation.py: 82%

223 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-12 22:41 -0600

1"""Tier 1: Output truncation at tool execution boundary. 

2 

3Prevents large outputs from entering context by truncating at source. 

4Cost: Free, Latency: 0ms. 

5""" 

6 

7import json 

8import re 

9from dataclasses import dataclass 

10from typing import Literal 

11 

12from harnessutils.config import TruncationConfig 

13 

14ContentType = Literal["json", "logs", "stacktrace", "code", "text"] 

15 

16 

17@dataclass 

18class TruncationResult: 

19 """Result of truncation operation.""" 

20 

21 content: str 

22 truncated: bool 

23 output_path: str | None = None 

24 bytes_removed: int = 0 

25 content_type: ContentType = "text" 

26 

27 

28def detect_content_type(output: str) -> ContentType: 

29 """Detect the content type of output. 

30 

31 Args: 

32 output: The output to analyze 

33 

34 Returns: 

35 Detected content type 

36 """ 

37 stripped = output.strip() 

38 

39 if not stripped: 

40 return "text" 

41 

42 # JSON: starts with { or [, valid JSON parse 

43 if stripped[0] in ("{", "["): 

44 try: 

45 json.loads(stripped) 

46 return "json" 

47 except (json.JSONDecodeError, ValueError): 

48 pass 

49 

50 # Stacktrace: Python/Java/JS error patterns 

51 stacktrace_patterns = [ 

52 r"Traceback \(most recent call last\)", 

53 r"^\s*at\s+.*\(.*:\d+:\d+\)", # JS stacktrace 

54 r'^\s*File ".*", line \d+', # Python stacktrace 

55 r"Exception in thread", # Java stacktrace 

56 ] 

57 

58 for pattern in stacktrace_patterns: 

59 if re.search(pattern, output, re.MULTILINE): 

60 return "stacktrace" 

61 

62 # Logs: timestamps, log levels 

63 log_patterns = [ 

64 r"\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}", # ISO timestamp 

65 r"\[(?:ERROR|WARN|INFO|DEBUG)\]", # Log levels 

66 r"(?:ERROR|WARN|INFO|DEBUG):", # Alternative format 

67 ] 

68 

69 for pattern in log_patterns: 

70 if re.search(pattern, output): 

71 return "logs" 

72 

73 # Code: Python/JS/Java keywords 

74 code_patterns = [ 

75 r"^\s*(?:def|class|import|from)\s+", # Python 

76 r"^\s*(?:function|const|let|var)\s+", # JavaScript 

77 r"^\s*(?:public|private|protected)\s+", # Java/C++ 

78 ] 

79 

80 for pattern in code_patterns: 

81 if re.search(pattern, output, re.MULTILINE): 

82 return "code" 

83 

84 return "text" 

85 

86 

87def _truncate_json(output: str, max_tokens: int) -> tuple[str, int]: 

88 """Truncate JSON while preserving structure. 

89 

90 Args: 

91 output: JSON string to truncate 

92 max_tokens: Maximum tokens to keep 

93 

94 Returns: 

95 Tuple of (truncated_json, tokens_removed) 

96 """ 

97 from harnessutils.tokens.exact import count_tokens_fast 

98 

99 try: 

100 data = json.loads(output) 

101 

102 # If it's already under limit, return as-is 

103 current_tokens = count_tokens_fast(output) 

104 if current_tokens <= max_tokens: 

105 return output, 0 

106 

107 # For arrays, keep first and last N items 

108 if isinstance(data, list): 

109 total_items = len(data) 

110 if total_items > 10: 

111 # Keep first 5 and last 5 

112 truncated = data[:5] + data[-5:] 

113 result = json.dumps(truncated, indent=2) 

114 result += f"\n\n... {total_items - 10} items truncated ..." 

115 tokens_removed = current_tokens - count_tokens_fast(result) 

116 return result, tokens_removed 

117 

118 # For objects, truncate large string values 

119 if isinstance(data, dict): 

120 truncated = {} 

121 for key, value in data.items(): 

122 if isinstance(value, str) and len(value) > 500: 

123 truncated[key] = value[:500] + "... (truncated)" 

124 elif isinstance(value, list) and len(value) > 10: 

125 truncated[key] = value[:5] + ["... (truncated)"] + value[-5:] 

126 else: 

127 truncated[key] = value 

128 

129 result = json.dumps(truncated, indent=2) 

130 tokens_removed = current_tokens - count_tokens_fast(result) 

131 return result, tokens_removed 

132 

133 return output, 0 

134 

135 except (json.JSONDecodeError, ValueError): 

136 # If JSON parsing fails, fallback to text truncation 

137 return _truncate_text(output, max_tokens) 

138 

139 

140def _truncate_stacktrace(output: str, max_tokens: int, config: TruncationConfig) -> tuple[str, int]: 

141 """Truncate stacktrace while preserving error message and key frames. 

142 

143 Args: 

144 output: Stacktrace to truncate 

145 max_tokens: Maximum tokens to keep 

146 config: Truncation configuration 

147 

148 Returns: 

149 Tuple of (truncated_stacktrace, tokens_removed) 

150 """ 

151 from harnessutils.tokens.exact import count_tokens_fast 

152 

153 current_tokens = count_tokens_fast(output) 

154 if current_tokens <= max_tokens: 

155 return output, 0 

156 

157 lines = output.split("\n") 

158 frame_limit = config.stacktrace_frame_limit // 2 # Split between top and bottom 

159 

160 # Find error message (usually at the end for Python, beginning for JS) 

161 error_line = None 

162 for i in range(len(lines) - 1, max(0, len(lines) - 5), -1): 

163 if lines[i].strip() and not lines[i].startswith(" "): 

164 error_line = i 

165 break 

166 

167 # Keep error message 

168 result_lines = [] 

169 if error_line is not None: 

170 result_lines.append(lines[error_line]) 

171 result_lines.append("") 

172 

173 # Keep top N frames 

174 result_lines.extend(lines[:frame_limit]) 

175 result_lines.append("") 

176 result_lines.append(f"... {len(lines) - 2 * frame_limit} frames truncated ...") 

177 result_lines.append("") 

178 

179 # Keep bottom N frames (excluding error message) 

180 if error_line is not None: 

181 result_lines.extend(lines[max(frame_limit, error_line - frame_limit):error_line]) 

182 else: 

183 result_lines.extend(lines[-frame_limit:]) 

184 

185 result = "\n".join(result_lines) 

186 tokens_removed = current_tokens - count_tokens_fast(result) 

187 return result, tokens_removed 

188 

189 

190def _truncate_logs(output: str, max_tokens: int, config: TruncationConfig) -> tuple[str, int]: 

191 """Truncate logs while preserving all errors and warnings. 

192 

193 Args: 

194 output: Logs to truncate 

195 max_tokens: Maximum tokens to keep 

196 config: Truncation configuration 

197 

198 Returns: 

199 Tuple of (truncated_logs, tokens_removed) 

200 """ 

201 from harnessutils.tokens.exact import count_tokens_fast 

202 

203 current_tokens = count_tokens_fast(output) 

204 if current_tokens <= max_tokens: 

205 return output, 0 

206 

207 lines = output.split("\n") 

208 

209 if not config.preserve_errors: 

210 # Just do head/tail truncation 

211 return _truncate_text(output, max_tokens) 

212 

213 # Categorize lines 

214 error_lines = [] 

215 warning_lines = [] 

216 info_lines = [] 

217 

218 error_pattern = re.compile(r"\[?ERROR\]?|ERROR:", re.IGNORECASE) 

219 warning_pattern = re.compile(r"\[?WARN(?:ING)?\]?|WARN(?:ING)?:", re.IGNORECASE) 

220 

221 for i, line in enumerate(lines): 

222 if error_pattern.search(line): 

223 error_lines.append((i, line)) 

224 elif warning_pattern.search(line): 

225 warning_lines.append((i, line)) 

226 else: 

227 info_lines.append((i, line)) 

228 

229 # Always keep all errors and warnings 

230 result_lines = [] 

231 

232 if error_lines: 

233 result_lines.append("=== ERRORS ===") 

234 result_lines.extend([line for _, line in error_lines]) 

235 result_lines.append("") 

236 

237 if warning_lines: 

238 result_lines.append("=== WARNINGS ===") 

239 result_lines.extend([line for _, line in warning_lines]) 

240 result_lines.append("") 

241 

242 # Sample info lines (first and last 50) 

243 if info_lines: 

244 result_lines.append("=== INFO (sampled) ===") 

245 if len(info_lines) > 100: 

246 result_lines.extend([line for _, line in info_lines[:50]]) 

247 result_lines.append(f"... {len(info_lines) - 100} info lines truncated ...") 

248 result_lines.extend([line for _, line in info_lines[-50:]]) 

249 else: 

250 result_lines.extend([line for _, line in info_lines]) 

251 

252 result = "\n".join(result_lines) 

253 tokens_removed = current_tokens - count_tokens_fast(result) 

254 return result, tokens_removed 

255 

256 

257def _truncate_code(output: str, max_tokens: int) -> tuple[str, int]: 

258 """Truncate code while preserving function signatures. 

259 

260 Args: 

261 output: Code to truncate 

262 max_tokens: Maximum tokens to keep 

263 

264 Returns: 

265 Tuple of (truncated_code, tokens_removed) 

266 """ 

267 from harnessutils.tokens.exact import count_tokens_fast 

268 

269 current_tokens = count_tokens_fast(output) 

270 if current_tokens <= max_tokens: 

271 return output, 0 

272 

273 lines = output.split("\n") 

274 result_lines = [] 

275 

276 # Patterns for function/class signatures 

277 signature_patterns = [ 

278 re.compile(r"^\s*(?:def|class)\s+\w+"), # Python 

279 re.compile(r"^\s*(?:function|const|let)\s+"), # JavaScript 

280 re.compile(r"^\s*(?:public|private|protected)\s+.*\("), # Java/C++ 

281 ] 

282 

283 for line in lines: 

284 is_signature = any(pattern.search(line) for pattern in signature_patterns) 

285 

286 if is_signature: 

287 result_lines.append(line) 

288 # Check if tokens exceeded 

289 if count_tokens_fast("\n".join(result_lines)) > max_tokens: 

290 result_lines.pop() 

291 result_lines.append("... (code truncated)") 

292 break 

293 elif len(result_lines) < 20: # Keep first 20 lines regardless 

294 result_lines.append(line) 

295 

296 result = "\n".join(result_lines) 

297 tokens_removed = current_tokens - count_tokens_fast(result) 

298 return result, tokens_removed 

299 

300 

301def _truncate_text(output: str, max_tokens: int) -> tuple[str, int]: 

302 """Truncate text at token boundaries (head + tail). 

303 

304 Args: 

305 output: Text to truncate 

306 max_tokens: Maximum tokens to keep 

307 

308 Returns: 

309 Tuple of (truncated_text, tokens_removed) 

310 """ 

311 from harnessutils.tokens.exact import count_tokens_fast 

312 

313 current_tokens = count_tokens_fast(output) 

314 if current_tokens <= max_tokens: 

315 return output, 0 

316 

317 # Split tokens 70/30 between head and tail 

318 head_tokens = int(max_tokens * 0.7) 

319 tail_tokens = max_tokens - head_tokens 

320 

321 # Approximate character counts (4 chars per token) 

322 head_chars = head_tokens * 4 

323 tail_chars = tail_tokens * 4 

324 

325 head = output[:head_chars] 

326 tail = output[-tail_chars:] 

327 

328 result = f"{head}\n\n... (truncated) ...\n\n{tail}" 

329 tokens_removed = current_tokens - count_tokens_fast(result) 

330 return result, tokens_removed 

331 

332 

333def truncate_output( 

334 output: str, 

335 config: TruncationConfig, 

336 output_id: str | None = None, 

337 content_type: ContentType | None = None, 

338) -> TruncationResult: 

339 """Truncate tool output if it exceeds limits. 

340 

341 Args: 

342 output: The tool output to potentially truncate 

343 config: Truncation configuration 

344 output_id: ID for saving full output (if None, full output not saved) 

345 content_type: Override detected content type 

346 

347 Returns: 

348 TruncationResult with content and metadata 

349 """ 

350 from harnessutils.tokens.exact import count_tokens_fast 

351 

352 # Detect content type if not provided 

353 detected_type = content_type or detect_content_type(output) 

354 

355 # Use content-aware truncation if enabled AND max_tokens would trigger 

356 # (backward compatibility: if only max_lines/max_bytes set, use legacy mode) 

357 current_tokens = count_tokens_fast(output) 

358 if config.use_content_aware and current_tokens > config.max_tokens: 

359 

360 # Route to specialized truncator 

361 try: 

362 if detected_type == "json": 

363 truncated, tokens_removed = _truncate_json(output, config.max_tokens) 

364 elif detected_type == "stacktrace": 

365 truncated, tokens_removed = _truncate_stacktrace(output, config.max_tokens, config) 

366 elif detected_type == "logs": 

367 truncated, tokens_removed = _truncate_logs(output, config.max_tokens, config) 

368 elif detected_type == "code": 

369 truncated, tokens_removed = _truncate_code(output, config.max_tokens) 

370 else: # text 

371 truncated, tokens_removed = _truncate_text(output, config.max_tokens) 

372 

373 return TruncationResult( 

374 content=truncated, 

375 truncated=True, 

376 output_path=output_id, 

377 bytes_removed=tokens_removed * 4, # Approximate 

378 content_type=detected_type, 

379 ) 

380 except Exception: 

381 # Fallback to text truncation on any error 

382 truncated, tokens_removed = _truncate_text(output, config.max_tokens) 

383 return TruncationResult( 

384 content=truncated, 

385 truncated=True, 

386 output_path=output_id, 

387 bytes_removed=tokens_removed * 4, 

388 content_type="text", 

389 ) 

390 

391 # Legacy line/byte-based truncation (backward compatibility) 

392 lines = output.split("\n") 

393 total_bytes = len(output.encode("utf-8")) 

394 

395 if len(lines) <= config.max_lines and total_bytes <= config.max_bytes: 

396 return TruncationResult( 

397 content=output, 

398 truncated=False, 

399 content_type=detected_type, 

400 ) 

401 

402 preview_lines: list[str] = [] 

403 bytes_accumulated = 0 

404 

405 if config.direction == "head": 

406 for i, line in enumerate(lines): 

407 if i >= config.max_lines: 

408 break 

409 line_bytes = len(line.encode("utf-8")) + 1 # +1 for newline 

410 if bytes_accumulated + line_bytes > config.max_bytes: 

411 break 

412 preview_lines.append(line) 

413 bytes_accumulated += line_bytes 

414 else: # tail 

415 for i in range(len(lines) - 1, -1, -1): 

416 if len(preview_lines) >= config.max_lines: 

417 break 

418 line = lines[i] 

419 line_bytes = len(line.encode("utf-8")) + 1 # +1 for newline 

420 if bytes_accumulated + line_bytes > config.max_bytes: 

421 break 

422 preview_lines.insert(0, line) 

423 bytes_accumulated += line_bytes 

424 

425 preview = "\n".join(preview_lines) 

426 bytes_removed = total_bytes - bytes_accumulated 

427 

428 direction: Literal["head", "tail"] = "head" if config.direction == "head" else "tail" 

429 message = _format_truncated_message( 

430 preview, 

431 bytes_removed, 

432 output_id, 

433 direction, 

434 ) 

435 

436 return TruncationResult( 

437 content=message, 

438 truncated=True, 

439 output_path=output_id, 

440 bytes_removed=bytes_removed, 

441 content_type=detected_type, 

442 ) 

443 

444 

445def _format_truncated_message( 

446 preview: str, 

447 bytes_removed: int, 

448 output_path: str | None, 

449 direction: Literal["head", "tail"], 

450) -> str: 

451 """Format the truncated output message. 

452 

453 Args: 

454 preview: Preview content (head or tail) 

455 bytes_removed: Number of bytes that were removed 

456 output_path: Path where full output was saved 

457 direction: Direction of truncation 

458 

459 Returns: 

460 Formatted message string 

461 """ 

462 parts = [preview] 

463 

464 if bytes_removed > 0: 

465 parts.append("") 

466 parts.append(f"...{bytes_removed} bytes truncated...") 

467 parts.append("") 

468 

469 if output_path: 

470 parts.append(f"Full output saved to: {output_path}") 

471 parts.append("Use search tools to query the full content or read specific sections.") 

472 parts.append("Delegate large file processing to specialized exploration agents.") 

473 

474 return "\n".join(parts)