Skip to content

fix: stabilize session.run() for TaskGroup and cascading agent handoffs#1196

Open
toubatbrian wants to merge 17 commits intomainfrom
brian/fix-test-framework
Open

fix: stabilize session.run() for TaskGroup and cascading agent handoffs#1196
toubatbrian wants to merge 17 commits intomainfrom
brian/fix-test-framework

Conversation

@toubatbrian
Copy link
Copy Markdown
Contributor

@toubatbrian toubatbrian commented Apr 3, 2026

Summary

  • session.run() now waits for the full agent bootstrap chain to settle before generating a reply, ensuring test events don't include stale bootstrap messages
  • RunResult stays alive through cascading handoffs (e.g. tool call → complete() → TaskGroup resumes → starts next child task), capturing the full transition chain
  • Removed the fragile speech-handle unwatch/re-watch dance from AgentTask.#start() in favor of cleaner handle tracking and microtask deferral

Problem

When using session.run() with agents that use TaskGroup (or any agent whose onEnter triggers child-task transitions), two issues occurred:

  1. Unstable first turn: run() would fire generateReply before the bootstrap chain (e.g. Agent → TaskGroup → SubAgentTask) fully settled, causing the LLM to respond without the correct task's instructions
  2. Missing cascade events: RunResult would resolve as soon as the speech handle finished, missing the subsequent handoff events (e.g. FirstAgentTask → TaskGroup → SecondAgentTask) triggered by the tool call's complete()

Changes

Files: agent_session.ts, agent.ts, agent_activity.ts (+1 line), run_result.ts, health_service_agent.ts, test_agent.test.ts

  • _drainActivityLock(): run() now drains the activity lock in a loop until the bootstrap chain settles (e.g. Agent → TaskGroup → VerifyIntentTask), so generateReply targets the correct final activity
  • _notifyActivityChanged() moved after start()/resume(): Uses the Event class so waiters see schedulingPaused=false
  • Settle handle for resumed activities: When resuming an agent whose onEnter loop is still running (e.g. TaskGroup advancing to the next child), a tracked handle races _activityChanged vs onEnterDone to keep RunResult alive through the cascade without blocking when no further transition comes
  • _trackRunHandle(_onEnterTask) in agent_activity.ts: One line that tracks each new activity's onEnter speech handle — keeps RunResult alive through the new task's onEnter. No-op during bootstrap (no RunResult yet)
  • Microtask-deferred _markDoneIfNeeded in run_result.ts: Defers resolution by one microtask so AgentTask.#start() finally blocks can add resumeTransitionTask before RunResult resolves
  • _setMinCreatedAt boundary: Filters bootstrap events from the first RunResult
  • Simplified AgentTask.#start(): Removed the fragile unwatch/re-watch/_markDoneIfNeeded dance — transition tasks are tracked directly
  • New health_service/ example with 3-turn TaskGroup integration test

Test plan

  • examples/src/health_service/test_agent.test.ts — 3-turn TaskGroup flow (verifyIntent → identifyPatient → schedulePatientVisit), all with full cascade events
  • examples/src/testing/run_result.test.ts — 28 tests pass
  • examples/src/testing/task_group.test.ts — 6 tests pass
  • examples/src/testing/basic_task_group.test.ts
  • Verify restaurant_agent.ts and realtime_agent.ts work in Agent Playground

@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Apr 3, 2026

🦋 Changeset detected

Latest commit: 2d7fa51

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 22 packages
Name Type
@livekit/agents Patch
@livekit/agents-plugin-anam Patch
@livekit/agents-plugin-baseten Patch
@livekit/agents-plugin-bey Patch
@livekit/agents-plugin-cartesia Patch
@livekit/agents-plugin-deepgram Patch
@livekit/agents-plugin-elevenlabs Patch
@livekit/agents-plugin-google Patch
@livekit/agents-plugin-hedra Patch
@livekit/agents-plugin-inworld Patch
@livekit/agents-plugin-lemonslice Patch
@livekit/agents-plugin-livekit Patch
@livekit/agents-plugin-neuphonic Patch
@livekit/agents-plugin-openai Patch
@livekit/agents-plugin-phonic Patch
@livekit/agents-plugin-resemble Patch
@livekit/agents-plugin-rime Patch
@livekit/agents-plugin-sarvam Patch
@livekit/agents-plugin-silero Patch
@livekit/agents-plugins-test Patch
@livekit/agents-plugin-trugen Patch
@livekit/agents-plugin-xai Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@toubatbrian toubatbrian changed the title Fix Test Framework fix: stabilize session.run() for TaskGroup and cascading agent handoffs Apr 3, 2026
@toubatbrian toubatbrian marked this pull request as ready for review April 3, 2026 03:13
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no bugs or issues to report.

Open in Devin Review

Base automatically changed from brian/reuse-realtime to brian/reuse-stt April 10, 2026 01:28
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View 7 additional findings in Devin Review.

Open in Devin Review

Comment on lines +420 to +425
await Promise.race([
this.sendSayAsync(text, options?.allowInterruptions),
new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error('say() timed out.')), RealtimeSession.SAY_TIMEOUT_MS);
}),
]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Phonic say() timeout creates unhandled promise rejection

In the Phonic say() method, Promise.race is used with a setTimeout-based timeout promise. When sendSayAsync resolves before the 10-second timeout, the timer continues running. When it eventually fires, it calls reject() on a promise that is no longer being listened to (since Promise.race already resolved with the winner). This produces an unhandled promise rejection in Node.js, which by default logs a warning and in strict mode (--unhandled-rejections=throw) crashes the process.

Problematic code pattern
await Promise.race([
  this.sendSayAsync(text, options?.allowInterruptions),
  new Promise<never>((_, reject) => {
    setTimeout(() => reject(new Error('say() timed out.')), RealtimeSession.SAY_TIMEOUT_MS);
  }),
]);

The setTimeout is never cleared when sendSayAsync wins the race.

Suggested change
await Promise.race([
this.sendSayAsync(text, options?.allowInterruptions),
new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error('say() timed out.')), RealtimeSession.SAY_TIMEOUT_MS);
}),
]);
let timer: ReturnType<typeof setTimeout> | undefined;
try {
await Promise.race([
this.sendSayAsync(text, options?.allowInterruptions),
new Promise<never>((_, reject) => {
timer = setTimeout(
() => reject(new Error('say() timed out.')),
RealtimeSession.SAY_TIMEOUT_MS,
);
}),
]);
} finally {
if (timer !== undefined) clearTimeout(timer);
}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async close(): Promise<void> {
this._mainTask.cancel();
await this.inputAudioStream.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Base RealtimeSession.close() async cleanup not awaited by OpenAI/Google subclasses

The PR adds await this.inputAudioStream.close() to the base RealtimeSession.close() (agents/src/llm/realtime.ts:153), making it perform meaningful async cleanup. However, the OpenAI realtime sessions (plugins/openai/src/realtime/realtime_model.ts:1120, plugins/openai/src/realtime/realtime_model_beta.ts:1033) and Google realtime session (plugins/google/src/beta/realtime/realtime_api.ts:795) all call super.close() without await. This means inputAudioStream.close() runs as a fire-and-forget floating promise — the MultiInputStream's reader release, pump drain, and writable-close sequence may not complete before the subclass's close() returns, leaking the stream resources and leaving the _mainTask reader loop dangling until the async close eventually finishes in the background.

Prompt for agents
The base RealtimeSession.close() at agents/src/llm/realtime.ts:151-154 now does meaningful async work (await this.inputAudioStream.close()). Three subclasses call super.close() without await:

- plugins/openai/src/realtime/realtime_model.ts:1120 (super.close() without await)
- plugins/openai/src/realtime/realtime_model_beta.ts:1033 (super.close() without await)
- plugins/google/src/beta/realtime/realtime_api.ts:795 (super.close() without await)

Phonic (plugins/phonic/src/realtime/realtime_model.ts:491) correctly uses 'await super.close()'.

The fix is to add 'await' before 'super.close()' in the three affected subclasses. Alternatively, the base class could be restructured to not require awaiting (e.g. fire-and-forget the inputAudioStream close or cancel the main task differently), but awaiting is the simplest and most correct fix.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Base automatically changed from brian/reuse-stt to main April 10, 2026 02:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant