Coverage for src / harnessutils / compaction / truncation.py: 82%
225 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 09:07 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 09:07 -0600
1"""Tier 1: Output truncation at tool execution boundary.
3Prevents large outputs from entering context by truncating at source.
4Cost: Free, Latency: 0ms.
5"""
7import json
8import re
9from dataclasses import dataclass
10from typing import Any, Literal
12from harnessutils.config import TruncationConfig
14ContentType = Literal["json", "logs", "stacktrace", "code", "text"]
17@dataclass
18class TruncationResult:
19 """Result of truncation operation."""
21 content: str
22 truncated: bool
23 output_path: str | None = None
24 bytes_removed: int = 0
25 content_type: ContentType = "text"
28def detect_content_type(output: str) -> ContentType:
29 """Detect the content type of output.
31 Args:
32 output: The output to analyze
34 Returns:
35 Detected content type
36 """
37 stripped = output.strip()
39 if not stripped:
40 return "text"
42 # JSON: starts with { or [, valid JSON parse
43 if stripped[0] in ("{", "["):
44 try:
45 json.loads(stripped)
46 return "json"
47 except (json.JSONDecodeError, ValueError):
48 pass
50 # Stacktrace: Python/Java/JS error patterns
51 stacktrace_patterns = [
52 r"Traceback \(most recent call last\)",
53 r"^\s*at\s+.*\(.*:\d+:\d+\)", # JS stacktrace
54 r'^\s*File ".*", line \d+', # Python stacktrace
55 r"Exception in thread", # Java stacktrace
56 ]
58 for pattern in stacktrace_patterns:
59 if re.search(pattern, output, re.MULTILINE):
60 return "stacktrace"
62 # Logs: timestamps, log levels
63 log_patterns = [
64 r"\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}", # ISO timestamp
65 r"\[(?:ERROR|WARN|INFO|DEBUG)\]", # Log levels
66 r"(?:ERROR|WARN|INFO|DEBUG):", # Alternative format
67 ]
69 for pattern in log_patterns:
70 if re.search(pattern, output):
71 return "logs"
73 # Code: Python/JS/Java keywords
74 code_patterns = [
75 r"^\s*(?:def|class|import|from)\s+", # Python
76 r"^\s*(?:function|const|let|var)\s+", # JavaScript
77 r"^\s*(?:public|private|protected)\s+", # Java/C++
78 ]
80 for pattern in code_patterns:
81 if re.search(pattern, output, re.MULTILINE):
82 return "code"
84 return "text"
87def _truncate_json(output: str, max_tokens: int, json_array_limit: int = 10) -> tuple[str, int]:
88 """Truncate JSON while preserving structure.
90 Args:
91 output: JSON string to truncate
92 max_tokens: Maximum tokens to keep
93 json_array_limit: Maximum items to keep in JSON arrays
95 Returns:
96 Tuple of (truncated_json, tokens_removed)
97 """
98 from harnessutils.tokens.exact import count_tokens_fast
100 try:
101 data = json.loads(output)
103 # If it's already under limit, return as-is
104 current_tokens = count_tokens_fast(output)
105 if current_tokens <= max_tokens:
106 return output, 0
108 head = json_array_limit // 2
109 tail = json_array_limit - head
111 # For arrays, keep first and last N items
112 if isinstance(data, list):
113 total_items = len(data)
114 if total_items > json_array_limit:
115 truncated_list = data[:head] + data[-tail:]
116 result = json.dumps(truncated_list, indent=2)
117 result += f"\n\n... {total_items - json_array_limit} items truncated ..."
118 tokens_removed = current_tokens - count_tokens_fast(result)
119 return result, tokens_removed
121 # For objects, truncate large string values
122 if isinstance(data, dict):
123 truncated_dict: dict[str, Any] = {}
124 for key, value in data.items():
125 if isinstance(value, str) and len(value) > 500:
126 truncated_dict[key] = value[:500] + "... (truncated)"
127 elif isinstance(value, list) and len(value) > json_array_limit:
128 truncated_dict[key] = value[:head] + ["... (truncated)"] + value[-tail:]
129 else:
130 truncated_dict[key] = value
132 result = json.dumps(truncated_dict, indent=2)
133 tokens_removed = current_tokens - count_tokens_fast(result)
134 return result, tokens_removed
136 return output, 0
138 except (json.JSONDecodeError, ValueError):
139 # If JSON parsing fails, fallback to text truncation
140 return _truncate_text(output, max_tokens)
143def _truncate_stacktrace(output: str, max_tokens: int, config: TruncationConfig) -> tuple[str, int]:
144 """Truncate stacktrace while preserving error message and key frames.
146 Args:
147 output: Stacktrace to truncate
148 max_tokens: Maximum tokens to keep
149 config: Truncation configuration
151 Returns:
152 Tuple of (truncated_stacktrace, tokens_removed)
153 """
154 from harnessutils.tokens.exact import count_tokens_fast
156 current_tokens = count_tokens_fast(output)
157 if current_tokens <= max_tokens:
158 return output, 0
160 lines = output.split("\n")
161 frame_limit = config.stacktrace_frame_limit // 2 # Split between top and bottom
163 # Find error message (usually at the end for Python, beginning for JS)
164 error_line = None
165 for i in range(len(lines) - 1, max(0, len(lines) - 5), -1):
166 if lines[i].strip() and not lines[i].startswith(" "):
167 error_line = i
168 break
170 # Keep error message
171 result_lines = []
172 if error_line is not None:
173 result_lines.append(lines[error_line])
174 result_lines.append("")
176 # Keep top N frames
177 result_lines.extend(lines[:frame_limit])
178 result_lines.append("")
179 result_lines.append(f"... {len(lines) - 2 * frame_limit} frames truncated ...")
180 result_lines.append("")
182 # Keep bottom N frames (excluding error message)
183 if error_line is not None:
184 result_lines.extend(lines[max(frame_limit, error_line - frame_limit):error_line])
185 else:
186 result_lines.extend(lines[-frame_limit:])
188 result = "\n".join(result_lines)
189 tokens_removed = current_tokens - count_tokens_fast(result)
190 return result, tokens_removed
193def _truncate_logs(output: str, max_tokens: int, config: TruncationConfig) -> tuple[str, int]:
194 """Truncate logs while preserving all errors and warnings.
196 Args:
197 output: Logs to truncate
198 max_tokens: Maximum tokens to keep
199 config: Truncation configuration
201 Returns:
202 Tuple of (truncated_logs, tokens_removed)
203 """
204 from harnessutils.tokens.exact import count_tokens_fast
206 current_tokens = count_tokens_fast(output)
207 if current_tokens <= max_tokens:
208 return output, 0
210 lines = output.split("\n")
212 if not config.preserve_errors:
213 # Just do head/tail truncation
214 return _truncate_text(output, max_tokens)
216 # Categorize lines
217 error_lines = []
218 warning_lines = []
219 info_lines = []
221 error_pattern = re.compile(r"\[?ERROR\]?|ERROR:", re.IGNORECASE)
222 warning_pattern = re.compile(r"\[?WARN(?:ING)?\]?|WARN(?:ING)?:", re.IGNORECASE)
224 for i, line in enumerate(lines):
225 if error_pattern.search(line):
226 error_lines.append((i, line))
227 elif warning_pattern.search(line):
228 warning_lines.append((i, line))
229 else:
230 info_lines.append((i, line))
232 # Always keep all errors and warnings
233 result_lines = []
235 if error_lines:
236 result_lines.append("=== ERRORS ===")
237 result_lines.extend([line for _, line in error_lines])
238 result_lines.append("")
240 if warning_lines:
241 result_lines.append("=== WARNINGS ===")
242 result_lines.extend([line for _, line in warning_lines])
243 result_lines.append("")
245 # Sample info lines (first and last 50)
246 if info_lines:
247 result_lines.append("=== INFO (sampled) ===")
248 if len(info_lines) > 100:
249 result_lines.extend([line for _, line in info_lines[:50]])
250 result_lines.append(f"... {len(info_lines) - 100} info lines truncated ...")
251 result_lines.extend([line for _, line in info_lines[-50:]])
252 else:
253 result_lines.extend([line for _, line in info_lines])
255 result = "\n".join(result_lines)
256 tokens_removed = current_tokens - count_tokens_fast(result)
257 return result, tokens_removed
260def _truncate_code(output: str, max_tokens: int) -> tuple[str, int]:
261 """Truncate code while preserving function signatures.
263 Args:
264 output: Code to truncate
265 max_tokens: Maximum tokens to keep
267 Returns:
268 Tuple of (truncated_code, tokens_removed)
269 """
270 from harnessutils.tokens.exact import count_tokens_fast
272 current_tokens = count_tokens_fast(output)
273 if current_tokens <= max_tokens:
274 return output, 0
276 lines = output.split("\n")
277 result_lines = []
279 # Patterns for function/class signatures
280 signature_patterns = [
281 re.compile(r"^\s*(?:def|class)\s+\w+"), # Python
282 re.compile(r"^\s*(?:function|const|let)\s+"), # JavaScript
283 re.compile(r"^\s*(?:public|private|protected)\s+.*\("), # Java/C++
284 ]
286 for line in lines:
287 is_signature = any(pattern.search(line) for pattern in signature_patterns)
289 if is_signature:
290 result_lines.append(line)
291 # Check if tokens exceeded
292 if count_tokens_fast("\n".join(result_lines)) > max_tokens:
293 result_lines.pop()
294 result_lines.append("... (code truncated)")
295 break
296 elif len(result_lines) < 20: # Keep first 20 lines regardless
297 result_lines.append(line)
299 result = "\n".join(result_lines)
300 tokens_removed = current_tokens - count_tokens_fast(result)
301 return result, tokens_removed
304def _truncate_text(output: str, max_tokens: int) -> tuple[str, int]:
305 """Truncate text at token boundaries (head + tail).
307 Args:
308 output: Text to truncate
309 max_tokens: Maximum tokens to keep
311 Returns:
312 Tuple of (truncated_text, tokens_removed)
313 """
314 from harnessutils.tokens.exact import count_tokens_fast
316 current_tokens = count_tokens_fast(output)
317 if current_tokens <= max_tokens:
318 return output, 0
320 # Split tokens 70/30 between head and tail
321 head_tokens = int(max_tokens * 0.7)
322 tail_tokens = max_tokens - head_tokens
324 # Approximate character counts (4 chars per token)
325 head_chars = head_tokens * 4
326 tail_chars = tail_tokens * 4
328 head = output[:head_chars]
329 tail = output[-tail_chars:]
331 result = f"{head}\n\n... (truncated) ...\n\n{tail}"
332 tokens_removed = current_tokens - count_tokens_fast(result)
333 return result, tokens_removed
336def truncate_output(
337 output: str,
338 config: TruncationConfig,
339 output_id: str | None = None,
340 content_type: ContentType | None = None,
341) -> TruncationResult:
342 """Truncate tool output if it exceeds limits.
344 Args:
345 output: The tool output to potentially truncate
346 config: Truncation configuration
347 output_id: ID for saving full output (if None, full output not saved)
348 content_type: Override detected content type
350 Returns:
351 TruncationResult with content and metadata
352 """
353 from harnessutils.tokens.exact import count_tokens_fast
355 # Detect content type if not provided
356 detected_type = content_type or detect_content_type(output)
358 # Use content-aware truncation if enabled AND max_tokens would trigger
359 # (backward compatibility: if only max_lines/max_bytes set, use legacy mode)
360 current_tokens = count_tokens_fast(output)
361 if config.use_content_aware and current_tokens > config.max_tokens:
363 # Route to specialized truncator
364 try:
365 if detected_type == "json":
366 truncated, tokens_removed = _truncate_json(
367 output, config.max_tokens, config.json_array_limit
368 )
369 elif detected_type == "stacktrace":
370 truncated, tokens_removed = _truncate_stacktrace(output, config.max_tokens, config)
371 elif detected_type == "logs":
372 truncated, tokens_removed = _truncate_logs(output, config.max_tokens, config)
373 elif detected_type == "code":
374 truncated, tokens_removed = _truncate_code(output, config.max_tokens)
375 else: # text
376 truncated, tokens_removed = _truncate_text(output, config.max_tokens)
378 return TruncationResult(
379 content=truncated,
380 truncated=True,
381 output_path=output_id,
382 bytes_removed=tokens_removed * 4, # Approximate
383 content_type=detected_type,
384 )
385 except Exception:
386 # Fallback to text truncation on any error
387 truncated, tokens_removed = _truncate_text(output, config.max_tokens)
388 return TruncationResult(
389 content=truncated,
390 truncated=True,
391 output_path=output_id,
392 bytes_removed=tokens_removed * 4,
393 content_type="text",
394 )
396 # Legacy line/byte-based truncation (backward compatibility)
397 lines = output.split("\n")
398 total_bytes = len(output.encode("utf-8"))
400 if len(lines) <= config.max_lines and total_bytes <= config.max_bytes:
401 return TruncationResult(
402 content=output,
403 truncated=False,
404 content_type=detected_type,
405 )
407 preview_lines: list[str] = []
408 bytes_accumulated = 0
410 if config.direction == "head":
411 for i, line in enumerate(lines):
412 if i >= config.max_lines:
413 break
414 line_bytes = len(line.encode("utf-8")) + 1 # +1 for newline
415 if bytes_accumulated + line_bytes > config.max_bytes:
416 break
417 preview_lines.append(line)
418 bytes_accumulated += line_bytes
419 else: # tail
420 for i in range(len(lines) - 1, -1, -1):
421 if len(preview_lines) >= config.max_lines:
422 break
423 line = lines[i]
424 line_bytes = len(line.encode("utf-8")) + 1 # +1 for newline
425 if bytes_accumulated + line_bytes > config.max_bytes:
426 break
427 preview_lines.insert(0, line)
428 bytes_accumulated += line_bytes
430 preview = "\n".join(preview_lines)
431 bytes_removed = total_bytes - bytes_accumulated
433 direction: Literal["head", "tail"] = "head" if config.direction == "head" else "tail"
434 message = _format_truncated_message(
435 preview,
436 bytes_removed,
437 output_id,
438 direction,
439 )
441 return TruncationResult(
442 content=message,
443 truncated=True,
444 output_path=output_id,
445 bytes_removed=bytes_removed,
446 content_type=detected_type,
447 )
450def _format_truncated_message(
451 preview: str,
452 bytes_removed: int,
453 output_path: str | None,
454 direction: Literal["head", "tail"],
455) -> str:
456 """Format the truncated output message.
458 Args:
459 preview: Preview content (head or tail)
460 bytes_removed: Number of bytes that were removed
461 output_path: Path where full output was saved
462 direction: Direction of truncation
464 Returns:
465 Formatted message string
466 """
467 parts = [preview]
469 if bytes_removed > 0:
470 parts.append("")
471 parts.append(f"...{bytes_removed} bytes truncated...")
472 parts.append("")
474 if output_path:
475 parts.append(f"Full output saved to: {output_path}")
476 parts.append("Use search tools to query the full content or read specific sections.")
477 parts.append("Delegate large file processing to specialized exploration agents.")
479 return "\n".join(parts)