Coverage for src / harnessutils / compaction / truncation.py: 82%
223 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 22:41 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 22:41 -0600
1"""Tier 1: Output truncation at tool execution boundary.
3Prevents large outputs from entering context by truncating at source.
4Cost: Free, Latency: 0ms.
5"""
7import json
8import re
9from dataclasses import dataclass
10from typing import Literal
12from harnessutils.config import TruncationConfig
14ContentType = Literal["json", "logs", "stacktrace", "code", "text"]
17@dataclass
18class TruncationResult:
19 """Result of truncation operation."""
21 content: str
22 truncated: bool
23 output_path: str | None = None
24 bytes_removed: int = 0
25 content_type: ContentType = "text"
28def detect_content_type(output: str) -> ContentType:
29 """Detect the content type of output.
31 Args:
32 output: The output to analyze
34 Returns:
35 Detected content type
36 """
37 stripped = output.strip()
39 if not stripped:
40 return "text"
42 # JSON: starts with { or [, valid JSON parse
43 if stripped[0] in ("{", "["):
44 try:
45 json.loads(stripped)
46 return "json"
47 except (json.JSONDecodeError, ValueError):
48 pass
50 # Stacktrace: Python/Java/JS error patterns
51 stacktrace_patterns = [
52 r"Traceback \(most recent call last\)",
53 r"^\s*at\s+.*\(.*:\d+:\d+\)", # JS stacktrace
54 r'^\s*File ".*", line \d+', # Python stacktrace
55 r"Exception in thread", # Java stacktrace
56 ]
58 for pattern in stacktrace_patterns:
59 if re.search(pattern, output, re.MULTILINE):
60 return "stacktrace"
62 # Logs: timestamps, log levels
63 log_patterns = [
64 r"\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}", # ISO timestamp
65 r"\[(?:ERROR|WARN|INFO|DEBUG)\]", # Log levels
66 r"(?:ERROR|WARN|INFO|DEBUG):", # Alternative format
67 ]
69 for pattern in log_patterns:
70 if re.search(pattern, output):
71 return "logs"
73 # Code: Python/JS/Java keywords
74 code_patterns = [
75 r"^\s*(?:def|class|import|from)\s+", # Python
76 r"^\s*(?:function|const|let|var)\s+", # JavaScript
77 r"^\s*(?:public|private|protected)\s+", # Java/C++
78 ]
80 for pattern in code_patterns:
81 if re.search(pattern, output, re.MULTILINE):
82 return "code"
84 return "text"
87def _truncate_json(output: str, max_tokens: int) -> tuple[str, int]:
88 """Truncate JSON while preserving structure.
90 Args:
91 output: JSON string to truncate
92 max_tokens: Maximum tokens to keep
94 Returns:
95 Tuple of (truncated_json, tokens_removed)
96 """
97 from harnessutils.tokens.exact import count_tokens_fast
99 try:
100 data = json.loads(output)
102 # If it's already under limit, return as-is
103 current_tokens = count_tokens_fast(output)
104 if current_tokens <= max_tokens:
105 return output, 0
107 # For arrays, keep first and last N items
108 if isinstance(data, list):
109 total_items = len(data)
110 if total_items > 10:
111 # Keep first 5 and last 5
112 truncated = data[:5] + data[-5:]
113 result = json.dumps(truncated, indent=2)
114 result += f"\n\n... {total_items - 10} items truncated ..."
115 tokens_removed = current_tokens - count_tokens_fast(result)
116 return result, tokens_removed
118 # For objects, truncate large string values
119 if isinstance(data, dict):
120 truncated = {}
121 for key, value in data.items():
122 if isinstance(value, str) and len(value) > 500:
123 truncated[key] = value[:500] + "... (truncated)"
124 elif isinstance(value, list) and len(value) > 10:
125 truncated[key] = value[:5] + ["... (truncated)"] + value[-5:]
126 else:
127 truncated[key] = value
129 result = json.dumps(truncated, indent=2)
130 tokens_removed = current_tokens - count_tokens_fast(result)
131 return result, tokens_removed
133 return output, 0
135 except (json.JSONDecodeError, ValueError):
136 # If JSON parsing fails, fallback to text truncation
137 return _truncate_text(output, max_tokens)
140def _truncate_stacktrace(output: str, max_tokens: int, config: TruncationConfig) -> tuple[str, int]:
141 """Truncate stacktrace while preserving error message and key frames.
143 Args:
144 output: Stacktrace to truncate
145 max_tokens: Maximum tokens to keep
146 config: Truncation configuration
148 Returns:
149 Tuple of (truncated_stacktrace, tokens_removed)
150 """
151 from harnessutils.tokens.exact import count_tokens_fast
153 current_tokens = count_tokens_fast(output)
154 if current_tokens <= max_tokens:
155 return output, 0
157 lines = output.split("\n")
158 frame_limit = config.stacktrace_frame_limit // 2 # Split between top and bottom
160 # Find error message (usually at the end for Python, beginning for JS)
161 error_line = None
162 for i in range(len(lines) - 1, max(0, len(lines) - 5), -1):
163 if lines[i].strip() and not lines[i].startswith(" "):
164 error_line = i
165 break
167 # Keep error message
168 result_lines = []
169 if error_line is not None:
170 result_lines.append(lines[error_line])
171 result_lines.append("")
173 # Keep top N frames
174 result_lines.extend(lines[:frame_limit])
175 result_lines.append("")
176 result_lines.append(f"... {len(lines) - 2 * frame_limit} frames truncated ...")
177 result_lines.append("")
179 # Keep bottom N frames (excluding error message)
180 if error_line is not None:
181 result_lines.extend(lines[max(frame_limit, error_line - frame_limit):error_line])
182 else:
183 result_lines.extend(lines[-frame_limit:])
185 result = "\n".join(result_lines)
186 tokens_removed = current_tokens - count_tokens_fast(result)
187 return result, tokens_removed
190def _truncate_logs(output: str, max_tokens: int, config: TruncationConfig) -> tuple[str, int]:
191 """Truncate logs while preserving all errors and warnings.
193 Args:
194 output: Logs to truncate
195 max_tokens: Maximum tokens to keep
196 config: Truncation configuration
198 Returns:
199 Tuple of (truncated_logs, tokens_removed)
200 """
201 from harnessutils.tokens.exact import count_tokens_fast
203 current_tokens = count_tokens_fast(output)
204 if current_tokens <= max_tokens:
205 return output, 0
207 lines = output.split("\n")
209 if not config.preserve_errors:
210 # Just do head/tail truncation
211 return _truncate_text(output, max_tokens)
213 # Categorize lines
214 error_lines = []
215 warning_lines = []
216 info_lines = []
218 error_pattern = re.compile(r"\[?ERROR\]?|ERROR:", re.IGNORECASE)
219 warning_pattern = re.compile(r"\[?WARN(?:ING)?\]?|WARN(?:ING)?:", re.IGNORECASE)
221 for i, line in enumerate(lines):
222 if error_pattern.search(line):
223 error_lines.append((i, line))
224 elif warning_pattern.search(line):
225 warning_lines.append((i, line))
226 else:
227 info_lines.append((i, line))
229 # Always keep all errors and warnings
230 result_lines = []
232 if error_lines:
233 result_lines.append("=== ERRORS ===")
234 result_lines.extend([line for _, line in error_lines])
235 result_lines.append("")
237 if warning_lines:
238 result_lines.append("=== WARNINGS ===")
239 result_lines.extend([line for _, line in warning_lines])
240 result_lines.append("")
242 # Sample info lines (first and last 50)
243 if info_lines:
244 result_lines.append("=== INFO (sampled) ===")
245 if len(info_lines) > 100:
246 result_lines.extend([line for _, line in info_lines[:50]])
247 result_lines.append(f"... {len(info_lines) - 100} info lines truncated ...")
248 result_lines.extend([line for _, line in info_lines[-50:]])
249 else:
250 result_lines.extend([line for _, line in info_lines])
252 result = "\n".join(result_lines)
253 tokens_removed = current_tokens - count_tokens_fast(result)
254 return result, tokens_removed
257def _truncate_code(output: str, max_tokens: int) -> tuple[str, int]:
258 """Truncate code while preserving function signatures.
260 Args:
261 output: Code to truncate
262 max_tokens: Maximum tokens to keep
264 Returns:
265 Tuple of (truncated_code, tokens_removed)
266 """
267 from harnessutils.tokens.exact import count_tokens_fast
269 current_tokens = count_tokens_fast(output)
270 if current_tokens <= max_tokens:
271 return output, 0
273 lines = output.split("\n")
274 result_lines = []
276 # Patterns for function/class signatures
277 signature_patterns = [
278 re.compile(r"^\s*(?:def|class)\s+\w+"), # Python
279 re.compile(r"^\s*(?:function|const|let)\s+"), # JavaScript
280 re.compile(r"^\s*(?:public|private|protected)\s+.*\("), # Java/C++
281 ]
283 for line in lines:
284 is_signature = any(pattern.search(line) for pattern in signature_patterns)
286 if is_signature:
287 result_lines.append(line)
288 # Check if tokens exceeded
289 if count_tokens_fast("\n".join(result_lines)) > max_tokens:
290 result_lines.pop()
291 result_lines.append("... (code truncated)")
292 break
293 elif len(result_lines) < 20: # Keep first 20 lines regardless
294 result_lines.append(line)
296 result = "\n".join(result_lines)
297 tokens_removed = current_tokens - count_tokens_fast(result)
298 return result, tokens_removed
301def _truncate_text(output: str, max_tokens: int) -> tuple[str, int]:
302 """Truncate text at token boundaries (head + tail).
304 Args:
305 output: Text to truncate
306 max_tokens: Maximum tokens to keep
308 Returns:
309 Tuple of (truncated_text, tokens_removed)
310 """
311 from harnessutils.tokens.exact import count_tokens_fast
313 current_tokens = count_tokens_fast(output)
314 if current_tokens <= max_tokens:
315 return output, 0
317 # Split tokens 70/30 between head and tail
318 head_tokens = int(max_tokens * 0.7)
319 tail_tokens = max_tokens - head_tokens
321 # Approximate character counts (4 chars per token)
322 head_chars = head_tokens * 4
323 tail_chars = tail_tokens * 4
325 head = output[:head_chars]
326 tail = output[-tail_chars:]
328 result = f"{head}\n\n... (truncated) ...\n\n{tail}"
329 tokens_removed = current_tokens - count_tokens_fast(result)
330 return result, tokens_removed
333def truncate_output(
334 output: str,
335 config: TruncationConfig,
336 output_id: str | None = None,
337 content_type: ContentType | None = None,
338) -> TruncationResult:
339 """Truncate tool output if it exceeds limits.
341 Args:
342 output: The tool output to potentially truncate
343 config: Truncation configuration
344 output_id: ID for saving full output (if None, full output not saved)
345 content_type: Override detected content type
347 Returns:
348 TruncationResult with content and metadata
349 """
350 from harnessutils.tokens.exact import count_tokens_fast
352 # Detect content type if not provided
353 detected_type = content_type or detect_content_type(output)
355 # Use content-aware truncation if enabled AND max_tokens would trigger
356 # (backward compatibility: if only max_lines/max_bytes set, use legacy mode)
357 current_tokens = count_tokens_fast(output)
358 if config.use_content_aware and current_tokens > config.max_tokens:
360 # Route to specialized truncator
361 try:
362 if detected_type == "json":
363 truncated, tokens_removed = _truncate_json(output, config.max_tokens)
364 elif detected_type == "stacktrace":
365 truncated, tokens_removed = _truncate_stacktrace(output, config.max_tokens, config)
366 elif detected_type == "logs":
367 truncated, tokens_removed = _truncate_logs(output, config.max_tokens, config)
368 elif detected_type == "code":
369 truncated, tokens_removed = _truncate_code(output, config.max_tokens)
370 else: # text
371 truncated, tokens_removed = _truncate_text(output, config.max_tokens)
373 return TruncationResult(
374 content=truncated,
375 truncated=True,
376 output_path=output_id,
377 bytes_removed=tokens_removed * 4, # Approximate
378 content_type=detected_type,
379 )
380 except Exception:
381 # Fallback to text truncation on any error
382 truncated, tokens_removed = _truncate_text(output, config.max_tokens)
383 return TruncationResult(
384 content=truncated,
385 truncated=True,
386 output_path=output_id,
387 bytes_removed=tokens_removed * 4,
388 content_type="text",
389 )
391 # Legacy line/byte-based truncation (backward compatibility)
392 lines = output.split("\n")
393 total_bytes = len(output.encode("utf-8"))
395 if len(lines) <= config.max_lines and total_bytes <= config.max_bytes:
396 return TruncationResult(
397 content=output,
398 truncated=False,
399 content_type=detected_type,
400 )
402 preview_lines: list[str] = []
403 bytes_accumulated = 0
405 if config.direction == "head":
406 for i, line in enumerate(lines):
407 if i >= config.max_lines:
408 break
409 line_bytes = len(line.encode("utf-8")) + 1 # +1 for newline
410 if bytes_accumulated + line_bytes > config.max_bytes:
411 break
412 preview_lines.append(line)
413 bytes_accumulated += line_bytes
414 else: # tail
415 for i in range(len(lines) - 1, -1, -1):
416 if len(preview_lines) >= config.max_lines:
417 break
418 line = lines[i]
419 line_bytes = len(line.encode("utf-8")) + 1 # +1 for newline
420 if bytes_accumulated + line_bytes > config.max_bytes:
421 break
422 preview_lines.insert(0, line)
423 bytes_accumulated += line_bytes
425 preview = "\n".join(preview_lines)
426 bytes_removed = total_bytes - bytes_accumulated
428 direction: Literal["head", "tail"] = "head" if config.direction == "head" else "tail"
429 message = _format_truncated_message(
430 preview,
431 bytes_removed,
432 output_id,
433 direction,
434 )
436 return TruncationResult(
437 content=message,
438 truncated=True,
439 output_path=output_id,
440 bytes_removed=bytes_removed,
441 content_type=detected_type,
442 )
445def _format_truncated_message(
446 preview: str,
447 bytes_removed: int,
448 output_path: str | None,
449 direction: Literal["head", "tail"],
450) -> str:
451 """Format the truncated output message.
453 Args:
454 preview: Preview content (head or tail)
455 bytes_removed: Number of bytes that were removed
456 output_path: Path where full output was saved
457 direction: Direction of truncation
459 Returns:
460 Formatted message string
461 """
462 parts = [preview]
464 if bytes_removed > 0:
465 parts.append("")
466 parts.append(f"...{bytes_removed} bytes truncated...")
467 parts.append("")
469 if output_path:
470 parts.append(f"Full output saved to: {output_path}")
471 parts.append("Use search tools to query the full content or read specific sections.")
472 parts.append("Delegate large file processing to specialized exploration agents.")
474 return "\n".join(parts)