Coverage for src / harnessutils / turn / processor.py: 91%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-12 22:41 -0600

1"""Turn processor for handling streaming LLM responses. 

2 

3Processes stream events from LLM and manages tool execution, 

4state tracking, and doom loop detection. 

5""" 

6 

7import time 

8from typing import Any 

9 

10from harnessutils.models.message import Message 

11from harnessutils.models.parts import ( 

12 ReasoningPart, 

13 StepFinishPart, 

14 StepStartPart, 

15 TextPart, 

16 TimeInfo, 

17 ToolPart, 

18 ToolState, 

19) 

20from harnessutils.models.usage import CacheUsage, Usage 

21from harnessutils.turn.hooks import TurnHooks 

22from harnessutils.turn.state_machine import ToolStateMachine 

23 

24 

25class TurnProcessor: 

26 """Processes streaming LLM events with hook-based customization. 

27 

28 Handles: 

29 - Stream event processing 

30 - Tool state management 

31 - Doom loop detection (3 identical tool calls) 

32 - Snapshot tracking 

33 - Hook invocation 

34 """ 

35 

36 def __init__( 

37 self, 

38 message: Message, 

39 hooks: TurnHooks | None = None, 

40 doom_loop_threshold: int = 3, 

41 ): 

42 """Initialize turn processor. 

43 

44 Args: 

45 message: Assistant message being built 

46 hooks: Optional hooks for customization 

47 doom_loop_threshold: Number of identical calls to trigger doom loop 

48 """ 

49 self.message = message 

50 self.hooks = hooks or TurnHooks() 

51 self.doom_loop_threshold = doom_loop_threshold 

52 

53 self.state_machine = ToolStateMachine() 

54 self.current_part: TextPart | ReasoningPart | None = None 

55 self.tool_calls: list[tuple[str, dict[str, Any]]] = [] 

56 

57 def process_stream_event(self, event: dict[str, Any]) -> None: 

58 """Process a single stream event from LLM. 

59 

60 Args: 

61 event: Stream event from LLM 

62 

63 Event types: 

64 - start: Stream started 

65 - text-start: Text content starting 

66 - text-delta: Text content delta 

67 - text-end: Text content finished 

68 - reasoning-start: Extended thinking starting 

69 - reasoning-delta: Reasoning delta 

70 - reasoning-end: Reasoning finished 

71 - tool-call: Tool should be executed 

72 - step-start: Turn starting 

73 - step-finish: Turn finished with usage info 

74 """ 

75 event_type = event.get("type") 

76 

77 if event_type == "start": 

78 self._handle_start() 

79 

80 elif event_type == "step-start": 

81 self._handle_step_start() 

82 

83 elif event_type == "text-start": 

84 self._handle_text_start() 

85 

86 elif event_type == "text-delta": 

87 self._handle_text_delta(event.get("text", "")) 

88 

89 elif event_type == "text-end": 

90 self._handle_text_end() 

91 

92 elif event_type == "reasoning-start": 

93 self._handle_reasoning_start() 

94 

95 elif event_type == "reasoning-delta": 

96 self._handle_reasoning_delta(event.get("text", "")) 

97 

98 elif event_type == "reasoning-end": 

99 self._handle_reasoning_end() 

100 

101 elif event_type == "tool-call": 

102 self._handle_tool_call(event) 

103 

104 elif event_type == "step-finish": 

105 self._handle_step_finish(event) 

106 

107 def _handle_start(self) -> None: 

108 """Handle stream start.""" 

109 pass 

110 

111 def _handle_step_start(self) -> None: 

112 """Handle step start - capture snapshot.""" 

113 snapshot = "" 

114 if self.hooks.on_snapshot: 

115 snapshot = self.hooks.on_snapshot("start") 

116 

117 part = StepStartPart(snapshot=snapshot) 

118 self.message.add_part(part) 

119 

120 def _handle_text_start(self) -> None: 

121 """Handle text content start.""" 

122 self.current_part = TextPart(text="", time=TimeInfo(start=int(time.time() * 1000))) 

123 

124 def _handle_text_delta(self, text: str) -> None: 

125 """Handle text content delta.""" 

126 if self.current_part and isinstance(self.current_part, TextPart): 

127 self.current_part.text += text 

128 

129 def _handle_text_end(self) -> None: 

130 """Handle text content end.""" 

131 if self.current_part and isinstance(self.current_part, TextPart): 

132 if self.current_part.time: 

133 self.current_part.time.end = int(time.time() * 1000) 

134 self.message.add_part(self.current_part) 

135 self.current_part = None 

136 

137 def _handle_reasoning_start(self) -> None: 

138 """Handle reasoning start.""" 

139 self.current_part = ReasoningPart(text="", time=TimeInfo(start=int(time.time() * 1000))) 

140 

141 def _handle_reasoning_delta(self, text: str) -> None: 

142 """Handle reasoning delta.""" 

143 if self.current_part and isinstance(self.current_part, ReasoningPart): 

144 self.current_part.text += text 

145 

146 def _handle_reasoning_end(self) -> None: 

147 """Handle reasoning end.""" 

148 if self.current_part and isinstance(self.current_part, ReasoningPart): 

149 if self.current_part.time: 

150 self.current_part.time.end = int(time.time() * 1000) 

151 self.message.add_part(self.current_part) 

152 self.current_part = None 

153 

154 def _handle_tool_call(self, event: dict[str, Any]) -> None: 

155 """Handle tool call event.""" 

156 tool_name = event.get("tool", "") 

157 call_id = event.get("call_id", "") 

158 tool_input = event.get("input", {}) 

159 

160 # Check for doom loop 

161 if self._check_doom_loop(tool_name, tool_input): 

162 return 

163 

164 # Track tool call 

165 self.tool_calls.append((tool_name, tool_input)) 

166 

167 # Create tool part 

168 tool_part = ToolPart( 

169 tool=tool_name, 

170 call_id=call_id, 

171 state=ToolState( 

172 status="pending", 

173 input=tool_input, 

174 time=TimeInfo(start=int(time.time() * 1000)), 

175 ), 

176 ) 

177 

178 # Execute tool via hook 

179 if self.hooks.on_tool_call: 

180 try: 

181 self.state_machine.start_tool(call_id) 

182 tool_part.state.status = "running" 

183 

184 result = self.hooks.on_tool_call(tool_name, call_id, tool_input) 

185 

186 self.state_machine.complete_tool(call_id) 

187 tool_part.state.status = "completed" 

188 tool_part.state.output = str(result) if result is not None else "" 

189 

190 if self.hooks.on_tool_result: 

191 self.hooks.on_tool_result(call_id, result) 

192 

193 except Exception as e: 

194 self.state_machine.fail_tool(call_id) 

195 tool_part.state.status = "error" 

196 tool_part.state.error = str(e) 

197 

198 if self.hooks.on_tool_error: 

199 self.hooks.on_tool_error(call_id, e) 

200 

201 if tool_part.state.time: 

202 tool_part.state.time.end = int(time.time() * 1000) 

203 

204 self.message.add_part(tool_part) 

205 

206 def _handle_step_finish(self, event: dict[str, Any]) -> None: 

207 """Handle step finish - capture usage and snapshot.""" 

208 usage_data = event.get("usage", {}) 

209 cache_data = usage_data.get("cache", {}) 

210 

211 usage = Usage( 

212 input=usage_data.get("input", 0), 

213 output=usage_data.get("output", 0), 

214 reasoning=usage_data.get("reasoning", 0), 

215 cache=CacheUsage( 

216 read=cache_data.get("read", 0), 

217 write=cache_data.get("write", 0), 

218 ), 

219 ) 

220 

221 self.message.tokens = usage 

222 self.message.cost = event.get("cost", 0.0) 

223 

224 snapshot = "" 

225 if self.hooks.on_snapshot: 

226 snapshot = self.hooks.on_snapshot("finish") 

227 

228 part = StepFinishPart( 

229 reason=event.get("reason", "stop"), 

230 snapshot=snapshot, 

231 tokens=usage_data, 

232 cost=event.get("cost", 0.0), 

233 ) 

234 self.message.add_part(part) 

235 

236 def _check_doom_loop(self, tool_name: str, tool_input: dict[str, Any]) -> bool: 

237 """Check if doom loop is occurring. 

238 

239 Args: 

240 tool_name: Tool being called 

241 tool_input: Tool input 

242 

243 Returns: 

244 True if doom loop detected and should stop 

245 """ 

246 if len(self.tool_calls) < self.doom_loop_threshold: 

247 return False 

248 

249 # Check last N calls 

250 last_calls = self.tool_calls[-self.doom_loop_threshold :] 

251 

252 # All must be identical 

253 all_identical = all(name == tool_name and inp == tool_input for name, inp in last_calls) 

254 

255 if all_identical: 

256 # Invoke hook 

257 if self.hooks.on_doom_loop: 

258 should_continue = self.hooks.on_doom_loop( 

259 tool_name, 

260 tool_input, 

261 self.doom_loop_threshold, 

262 ) 

263 return not should_continue 

264 

265 # Default: stop 

266 return True 

267 

268 return False