fix: stabilize session.run() for TaskGroup and cascading agent handoffs#1196
fix: stabilize session.run() for TaskGroup and cascading agent handoffs#1196toubatbrian wants to merge 17 commits intomainfrom
Conversation
🦋 Changeset detectedLatest commit: 2d7fa51 The changes in this PR will be included in the next version bump. This PR includes changesets to release 22 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
| await Promise.race([ | ||
| this.sendSayAsync(text, options?.allowInterruptions), | ||
| new Promise<never>((_, reject) => { | ||
| setTimeout(() => reject(new Error('say() timed out.')), RealtimeSession.SAY_TIMEOUT_MS); | ||
| }), | ||
| ]); |
There was a problem hiding this comment.
🔴 Phonic say() timeout creates unhandled promise rejection
In the Phonic say() method, Promise.race is used with a setTimeout-based timeout promise. When sendSayAsync resolves before the 10-second timeout, the timer continues running. When it eventually fires, it calls reject() on a promise that is no longer being listened to (since Promise.race already resolved with the winner). This produces an unhandled promise rejection in Node.js, which by default logs a warning and in strict mode (--unhandled-rejections=throw) crashes the process.
Problematic code pattern
await Promise.race([
this.sendSayAsync(text, options?.allowInterruptions),
new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error('say() timed out.')), RealtimeSession.SAY_TIMEOUT_MS);
}),
]);The setTimeout is never cleared when sendSayAsync wins the race.
| await Promise.race([ | |
| this.sendSayAsync(text, options?.allowInterruptions), | |
| new Promise<never>((_, reject) => { | |
| setTimeout(() => reject(new Error('say() timed out.')), RealtimeSession.SAY_TIMEOUT_MS); | |
| }), | |
| ]); | |
| let timer: ReturnType<typeof setTimeout> | undefined; | |
| try { | |
| await Promise.race([ | |
| this.sendSayAsync(text, options?.allowInterruptions), | |
| new Promise<never>((_, reject) => { | |
| timer = setTimeout( | |
| () => reject(new Error('say() timed out.')), | |
| RealtimeSession.SAY_TIMEOUT_MS, | |
| ); | |
| }), | |
| ]); | |
| } finally { | |
| if (timer !== undefined) clearTimeout(timer); | |
| } |
Was this helpful? React with 👍 or 👎 to provide feedback.
|
|
||
| async close(): Promise<void> { | ||
| this._mainTask.cancel(); | ||
| await this.inputAudioStream.close(); |
There was a problem hiding this comment.
🟡 Base RealtimeSession.close() async cleanup not awaited by OpenAI/Google subclasses
The PR adds await this.inputAudioStream.close() to the base RealtimeSession.close() (agents/src/llm/realtime.ts:153), making it perform meaningful async cleanup. However, the OpenAI realtime sessions (plugins/openai/src/realtime/realtime_model.ts:1120, plugins/openai/src/realtime/realtime_model_beta.ts:1033) and Google realtime session (plugins/google/src/beta/realtime/realtime_api.ts:795) all call super.close() without await. This means inputAudioStream.close() runs as a fire-and-forget floating promise — the MultiInputStream's reader release, pump drain, and writable-close sequence may not complete before the subclass's close() returns, leaking the stream resources and leaving the _mainTask reader loop dangling until the async close eventually finishes in the background.
Prompt for agents
The base RealtimeSession.close() at agents/src/llm/realtime.ts:151-154 now does meaningful async work (await this.inputAudioStream.close()). Three subclasses call super.close() without await:
- plugins/openai/src/realtime/realtime_model.ts:1120 (super.close() without await)
- plugins/openai/src/realtime/realtime_model_beta.ts:1033 (super.close() without await)
- plugins/google/src/beta/realtime/realtime_api.ts:795 (super.close() without await)
Phonic (plugins/phonic/src/realtime/realtime_model.ts:491) correctly uses 'await super.close()'.
The fix is to add 'await' before 'super.close()' in the three affected subclasses. Alternatively, the base class could be restructured to not require awaiting (e.g. fire-and-forget the inputAudioStream close or cancel the main task differently), but awaiting is the simplest and most correct fix.
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
session.run()now waits for the full agent bootstrap chain to settle before generating a reply, ensuring test events don't include stale bootstrap messagesRunResultstays alive through cascading handoffs (e.g. tool call →complete()→ TaskGroup resumes → starts next child task), capturing the full transition chainAgentTask.#start()in favor of cleaner handle tracking and microtask deferralProblem
When using
session.run()with agents that useTaskGroup(or any agent whoseonEntertriggers child-task transitions), two issues occurred:run()would firegenerateReplybefore the bootstrap chain (e.g.Agent → TaskGroup → SubAgentTask) fully settled, causing the LLM to respond without the correct task's instructionsRunResultwould resolve as soon as the speech handle finished, missing the subsequent handoff events (e.g.FirstAgentTask → TaskGroup → SecondAgentTask) triggered by the tool call'scomplete()Changes
Files:
agent_session.ts,agent.ts,agent_activity.ts(+1 line),run_result.ts,health_service_agent.ts,test_agent.test.ts_drainActivityLock():run()now drains the activity lock in a loop until the bootstrap chain settles (e.g. Agent → TaskGroup → VerifyIntentTask), sogenerateReplytargets the correct final activity_notifyActivityChanged()moved afterstart()/resume(): Uses theEventclass so waiters seeschedulingPaused=falseonEnterloop is still running (e.g. TaskGroup advancing to the next child), a tracked handle races_activityChangedvsonEnterDoneto keepRunResultalive through the cascade without blocking when no further transition comes_trackRunHandle(_onEnterTask)inagent_activity.ts: One line that tracks each new activity's onEnter speech handle — keeps RunResult alive through the new task's onEnter. No-op during bootstrap (no RunResult yet)_markDoneIfNeededinrun_result.ts: Defers resolution by one microtask soAgentTask.#start()finally blocks can addresumeTransitionTaskbefore RunResult resolves_setMinCreatedAtboundary: Filters bootstrap events from the first RunResultAgentTask.#start(): Removed the fragile unwatch/re-watch/_markDoneIfNeededdance — transition tasks are tracked directlyhealth_service/example with 3-turn TaskGroup integration testTest plan
examples/src/health_service/test_agent.test.ts— 3-turn TaskGroup flow (verifyIntent → identifyPatient → schedulePatientVisit), all with full cascade eventsexamples/src/testing/run_result.test.ts— 28 tests passexamples/src/testing/task_group.test.ts— 6 tests passexamples/src/testing/basic_task_group.test.tsrestaurant_agent.tsandrealtime_agent.tswork in Agent Playground