Coverage for src / harnessutils / turn / processor.py: 91%
116 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 22:41 -0600
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 22:41 -0600
1"""Turn processor for handling streaming LLM responses.
3Processes stream events from LLM and manages tool execution,
4state tracking, and doom loop detection.
5"""
7import time
8from typing import Any
10from harnessutils.models.message import Message
11from harnessutils.models.parts import (
12 ReasoningPart,
13 StepFinishPart,
14 StepStartPart,
15 TextPart,
16 TimeInfo,
17 ToolPart,
18 ToolState,
19)
20from harnessutils.models.usage import CacheUsage, Usage
21from harnessutils.turn.hooks import TurnHooks
22from harnessutils.turn.state_machine import ToolStateMachine
25class TurnProcessor:
26 """Processes streaming LLM events with hook-based customization.
28 Handles:
29 - Stream event processing
30 - Tool state management
31 - Doom loop detection (3 identical tool calls)
32 - Snapshot tracking
33 - Hook invocation
34 """
36 def __init__(
37 self,
38 message: Message,
39 hooks: TurnHooks | None = None,
40 doom_loop_threshold: int = 3,
41 ):
42 """Initialize turn processor.
44 Args:
45 message: Assistant message being built
46 hooks: Optional hooks for customization
47 doom_loop_threshold: Number of identical calls to trigger doom loop
48 """
49 self.message = message
50 self.hooks = hooks or TurnHooks()
51 self.doom_loop_threshold = doom_loop_threshold
53 self.state_machine = ToolStateMachine()
54 self.current_part: TextPart | ReasoningPart | None = None
55 self.tool_calls: list[tuple[str, dict[str, Any]]] = []
57 def process_stream_event(self, event: dict[str, Any]) -> None:
58 """Process a single stream event from LLM.
60 Args:
61 event: Stream event from LLM
63 Event types:
64 - start: Stream started
65 - text-start: Text content starting
66 - text-delta: Text content delta
67 - text-end: Text content finished
68 - reasoning-start: Extended thinking starting
69 - reasoning-delta: Reasoning delta
70 - reasoning-end: Reasoning finished
71 - tool-call: Tool should be executed
72 - step-start: Turn starting
73 - step-finish: Turn finished with usage info
74 """
75 event_type = event.get("type")
77 if event_type == "start":
78 self._handle_start()
80 elif event_type == "step-start":
81 self._handle_step_start()
83 elif event_type == "text-start":
84 self._handle_text_start()
86 elif event_type == "text-delta":
87 self._handle_text_delta(event.get("text", ""))
89 elif event_type == "text-end":
90 self._handle_text_end()
92 elif event_type == "reasoning-start":
93 self._handle_reasoning_start()
95 elif event_type == "reasoning-delta":
96 self._handle_reasoning_delta(event.get("text", ""))
98 elif event_type == "reasoning-end":
99 self._handle_reasoning_end()
101 elif event_type == "tool-call":
102 self._handle_tool_call(event)
104 elif event_type == "step-finish":
105 self._handle_step_finish(event)
107 def _handle_start(self) -> None:
108 """Handle stream start."""
109 pass
111 def _handle_step_start(self) -> None:
112 """Handle step start - capture snapshot."""
113 snapshot = ""
114 if self.hooks.on_snapshot:
115 snapshot = self.hooks.on_snapshot("start")
117 part = StepStartPart(snapshot=snapshot)
118 self.message.add_part(part)
120 def _handle_text_start(self) -> None:
121 """Handle text content start."""
122 self.current_part = TextPart(text="", time=TimeInfo(start=int(time.time() * 1000)))
124 def _handle_text_delta(self, text: str) -> None:
125 """Handle text content delta."""
126 if self.current_part and isinstance(self.current_part, TextPart):
127 self.current_part.text += text
129 def _handle_text_end(self) -> None:
130 """Handle text content end."""
131 if self.current_part and isinstance(self.current_part, TextPart):
132 if self.current_part.time:
133 self.current_part.time.end = int(time.time() * 1000)
134 self.message.add_part(self.current_part)
135 self.current_part = None
137 def _handle_reasoning_start(self) -> None:
138 """Handle reasoning start."""
139 self.current_part = ReasoningPart(text="", time=TimeInfo(start=int(time.time() * 1000)))
141 def _handle_reasoning_delta(self, text: str) -> None:
142 """Handle reasoning delta."""
143 if self.current_part and isinstance(self.current_part, ReasoningPart):
144 self.current_part.text += text
146 def _handle_reasoning_end(self) -> None:
147 """Handle reasoning end."""
148 if self.current_part and isinstance(self.current_part, ReasoningPart):
149 if self.current_part.time:
150 self.current_part.time.end = int(time.time() * 1000)
151 self.message.add_part(self.current_part)
152 self.current_part = None
154 def _handle_tool_call(self, event: dict[str, Any]) -> None:
155 """Handle tool call event."""
156 tool_name = event.get("tool", "")
157 call_id = event.get("call_id", "")
158 tool_input = event.get("input", {})
160 # Check for doom loop
161 if self._check_doom_loop(tool_name, tool_input):
162 return
164 # Track tool call
165 self.tool_calls.append((tool_name, tool_input))
167 # Create tool part
168 tool_part = ToolPart(
169 tool=tool_name,
170 call_id=call_id,
171 state=ToolState(
172 status="pending",
173 input=tool_input,
174 time=TimeInfo(start=int(time.time() * 1000)),
175 ),
176 )
178 # Execute tool via hook
179 if self.hooks.on_tool_call:
180 try:
181 self.state_machine.start_tool(call_id)
182 tool_part.state.status = "running"
184 result = self.hooks.on_tool_call(tool_name, call_id, tool_input)
186 self.state_machine.complete_tool(call_id)
187 tool_part.state.status = "completed"
188 tool_part.state.output = str(result) if result is not None else ""
190 if self.hooks.on_tool_result:
191 self.hooks.on_tool_result(call_id, result)
193 except Exception as e:
194 self.state_machine.fail_tool(call_id)
195 tool_part.state.status = "error"
196 tool_part.state.error = str(e)
198 if self.hooks.on_tool_error:
199 self.hooks.on_tool_error(call_id, e)
201 if tool_part.state.time:
202 tool_part.state.time.end = int(time.time() * 1000)
204 self.message.add_part(tool_part)
206 def _handle_step_finish(self, event: dict[str, Any]) -> None:
207 """Handle step finish - capture usage and snapshot."""
208 usage_data = event.get("usage", {})
209 cache_data = usage_data.get("cache", {})
211 usage = Usage(
212 input=usage_data.get("input", 0),
213 output=usage_data.get("output", 0),
214 reasoning=usage_data.get("reasoning", 0),
215 cache=CacheUsage(
216 read=cache_data.get("read", 0),
217 write=cache_data.get("write", 0),
218 ),
219 )
221 self.message.tokens = usage
222 self.message.cost = event.get("cost", 0.0)
224 snapshot = ""
225 if self.hooks.on_snapshot:
226 snapshot = self.hooks.on_snapshot("finish")
228 part = StepFinishPart(
229 reason=event.get("reason", "stop"),
230 snapshot=snapshot,
231 tokens=usage_data,
232 cost=event.get("cost", 0.0),
233 )
234 self.message.add_part(part)
236 def _check_doom_loop(self, tool_name: str, tool_input: dict[str, Any]) -> bool:
237 """Check if doom loop is occurring.
239 Args:
240 tool_name: Tool being called
241 tool_input: Tool input
243 Returns:
244 True if doom loop detected and should stop
245 """
246 if len(self.tool_calls) < self.doom_loop_threshold:
247 return False
249 # Check last N calls
250 last_calls = self.tool_calls[-self.doom_loop_threshold :]
252 # All must be identical
253 all_identical = all(name == tool_name and inp == tool_input for name, inp in last_calls)
255 if all_identical:
256 # Invoke hook
257 if self.hooks.on_doom_loop:
258 should_continue = self.hooks.on_doom_loop(
259 tool_name,
260 tool_input,
261 self.doom_loop_threshold,
262 )
263 return not should_continue
265 # Default: stop
266 return True
268 return False