How does steering and queuing work?
how does the steering and queuing work?
Now I have a comprehensive picture. Here's how steering and queuing work:
##Steering and Queuing in AgentLayer
These are two send modes (SendMode = "steer" | "queue") that control how messages are injected into a running agent loop. They only matter when the session is already busy (i.e., the model loop is actively processing). When the session is idle, both modes behave identically — they just start a new loop.
###The Two Modes
####Steer (mode: "steer")
- Injects the message before the next model call within the current loop iteration.
- This is like interrupting the agent mid-turn: "Actually, do this instead."
- The steering message gets picked up at "Drain point 1" in the loop — right before
streamText()is called on the next step.
####Queue (mode: "queue")
- Processes the message after the current turn would normally end (i.e., when the model stops producing tool calls).
- This is like a follow-up: "When you're done, also do this."
- The queued message gets picked up at "Drain point 3" in the loop — after the model finishes a step with no tool calls (which would normally end the loop), the follow-up messages keep the loop alive for another iteration.
###How It Works Under the Hood
In Session (src/session.ts):
private steeringQueue: ModelMessage[] = [];
private followUpQueue: ModelMessage[] = [];
When send() is called while the loop is already running (lines ~307-314):
// Loop running — route based on mode (synchronous, no await)
const queue = mode === "steer" ? this.steeringQueue : this.followUpQueue;
for (const msg of userMessages) queue.push(msg);
The session passes two callback functions to the loop (lines ~341-342):
getSteeringMessages: () => drainQueue(this.steeringQueue),
getFollowUpMessages: () => drainQueue(this.followUpQueue),
In loop() (src/loop.ts), the callbacks are invoked at two specific points in the while loop:
-
Drain point 1 (line ~240):
getSteeringMessages()is called at the top of every loop step, right before the model call. Any steering messages are pushed into the running message array, so the model sees them as additional context:if (getSteeringMessages) { const steering = getSteeringMessages(); for (const msg of steering) msgs.push(msg); } -
Drain point 3 (line ~299):
getFollowUpMessages()is called after a step with no tool calls (which would normally break the loop). If follow-up messages exist, the loopcontinues for another iteration instead of ending:if (!hasToolCalls) { if (getFollowUpMessages) { const followUp = getFollowUpMessages(); if (followUp.length > 0) { for (const msg of followUp) msgs.push(msg); continue; // keeps the loop alive } } break; }
###Convenience Methods
The session provides shorthand methods (src/session.ts lines ~213-218):
steer(input): void // equivalent to send(input, { mode: "steer" })
followUp(input): void // equivalent to send(input, { mode: "queue" })
###Default Mode
The default send mode is "steer". You can change it per-agent or per-session via the sendMode config option:
const agent = new Agent({ sendMode: "queue", ... });
// or per-session:
const session = await agent.createSession({ sendMode: "queue" });
When a mode isn't explicitly passed to send(), it falls back to opts.mode → config.sendMode → "steer".
###Cleanup on Error/Completion
When the loop finishes (or errors), the settle() method clears both queues:
private settle(): Deferred {
// ...
this.steeringQueue.length = 0;
this.followUpQueue.length = 0;
// ...
}
This ensures stale messages don't leak into the next loop run.
###Summary Table
| Steer | Queue | |
|---|---|---|
| When injected | Before the next model call (within current turn) | After current turn ends (extends the loop) |
| Analogy | "Stop, change direction" | "When you're done, do this too" |
| Drain point | Top of each loop step (point 1) | After no-tool-call step (point 3) |
| Default? | ✅ Yes | No |
so it's an in-mem queue? doesn't survive crashes and not well suited for serverless?
Yes, you're exactly right. The steering and queuing mechanism is purely in-memory and ephemeral. Here's the full picture:
###What is persisted
The SessionStore (either InMemorySessionStore or JsonlSessionStore) persists completed messages — every user, assistant, and tool message gets appended to the store as it's produced. So the conversation history survives restarts (if you use JsonlSessionStore). When a session is loaded via createSession(id), it rebuilds _messages from the stored entries.
###What is not persisted
The steering/queuing state is entirely held in two plain arrays on the Session instance:
private steeringQueue: ModelMessage[] = [];
private followUpQueue: ModelMessage[] = [];
Along with:
private completion: Deferred | null— the running loop's promiseprivate controller: AbortController | null— the abort handle
None of this is saved anywhere. On settle() (loop end or error), both queues are wiped:
this.steeringQueue.length = 0;
this.followUpQueue.length = 0;
###So yes, the implications are:
-
Crash = lost queued messages. If the process dies while the loop is running, any messages sitting in
steeringQueueorfollowUpQueuethat haven't been drained yet are gone. They were never persisted. -
Serverless is a poor fit for mid-turn steering. In a serverless/edge function, the
Sessionobject lives only for the duration of that invocation. You can't callsession.send("First"), return the response, then in a separate invocation callsession.steer("Change course")— theSessioninstance (and its loop, queues, andAbortController) no longer exists. -
The loop itself is a long-lived
whileloop withasync for await. It's designed around a persistent process — theloop()generator insrc/loop.tsruns awhile (step < maxSteps)loop thatyields events. This is fundamentally tied to a single execution context.
###What would work across restarts
You could restore the conversation history by reloading a session from the JsonlSessionStore and sending a new message — that effectively starts a fresh loop with the full prior context. But there's no mechanism to resume a mid-turn loop or replay queued steering/follow-up messages that hadn't been consumed yet.
In short: this is an in-process, long-lived agent loop design. Steering and queuing are coordination primitives for a concurrent caller within the same process (e.g., a TUI sending keystrokes while the agent is working), not a durable task queue.
END OF POST