How does steering and queuing work?

HEAD
c33aa36Inline attachLogger into examples for easy copy-paste

how does the steering and queuing work?

Avatar of anthropic/claude-opus-4.6
Claude Opus 4.6
commented

Now I have a comprehensive picture. Here's how steering and queuing work:


##Steering and Queuing in AgentLayer

These are two send modes (SendMode = "steer" | "queue") that control how messages are injected into a running agent loop. They only matter when the session is already busy (i.e., the model loop is actively processing). When the session is idle, both modes behave identically — they just start a new loop.

###The Two Modes

####Steer (mode: "steer")

  • Injects the message before the next model call within the current loop iteration.
  • This is like interrupting the agent mid-turn: "Actually, do this instead."
  • The steering message gets picked up at "Drain point 1" in the loop — right before streamText() is called on the next step.

####Queue (mode: "queue")

  • Processes the message after the current turn would normally end (i.e., when the model stops producing tool calls).
  • This is like a follow-up: "When you're done, also do this."
  • The queued message gets picked up at "Drain point 3" in the loop — after the model finishes a step with no tool calls (which would normally end the loop), the follow-up messages keep the loop alive for another iteration.

###How It Works Under the Hood

In Session (src/session.ts):

private steeringQueue: ModelMessage[] = [];
private followUpQueue: ModelMessage[] = [];

When send() is called while the loop is already running (lines ~307-314):

// Loop running — route based on mode (synchronous, no await)
const queue = mode === "steer" ? this.steeringQueue : this.followUpQueue;
for (const msg of userMessages) queue.push(msg);

The session passes two callback functions to the loop (lines ~341-342):

getSteeringMessages: () => drainQueue(this.steeringQueue),
getFollowUpMessages: () => drainQueue(this.followUpQueue),

In loop() (src/loop.ts), the callbacks are invoked at two specific points in the while loop:

  1. Drain point 1 (line ~240): getSteeringMessages() is called at the top of every loop step, right before the model call. Any steering messages are pushed into the running message array, so the model sees them as additional context:

    if (getSteeringMessages) {
      const steering = getSteeringMessages();
      for (const msg of steering) msgs.push(msg);
    }
    
  2. Drain point 3 (line ~299): getFollowUpMessages() is called after a step with no tool calls (which would normally break the loop). If follow-up messages exist, the loop continues for another iteration instead of ending:

    if (!hasToolCalls) {
      if (getFollowUpMessages) {
        const followUp = getFollowUpMessages();
        if (followUp.length > 0) {
          for (const msg of followUp) msgs.push(msg);
          continue; // keeps the loop alive
        }
      }
      break;
    }
    

###Convenience Methods

The session provides shorthand methods (src/session.ts lines ~213-218):

steer(input): void    // equivalent to send(input, { mode: "steer" })
followUp(input): void // equivalent to send(input, { mode: "queue" })

###Default Mode

The default send mode is "steer". You can change it per-agent or per-session via the sendMode config option:

const agent = new Agent({ sendMode: "queue", ... });
// or per-session:
const session = await agent.createSession({ sendMode: "queue" });

When a mode isn't explicitly passed to send(), it falls back to opts.mode → config.sendMode → "steer".

###Cleanup on Error/Completion

When the loop finishes (or errors), the settle() method clears both queues:

private settle(): Deferred {
  // ...
  this.steeringQueue.length = 0;
  this.followUpQueue.length = 0;
  // ...
}

This ensures stale messages don't leak into the next loop run.

###Summary Table

SteerQueue
When injectedBefore the next model call (within current turn)After current turn ends (extends the loop)
Analogy"Stop, change direction""When you're done, do this too"
Drain pointTop of each loop step (point 1)After no-tool-call step (point 3)
Default?✅ YesNo
Avatar of julianbenegas
Julian Benegas
commented

so it's an in-mem queue? doesn't survive crashes and not well suited for serverless?

Avatar of anthropic/claude-opus-4.6
Claude Opus 4.6
commented

Yes, you're exactly right. The steering and queuing mechanism is purely in-memory and ephemeral. Here's the full picture:

###What is persisted

The SessionStore (either InMemorySessionStore or JsonlSessionStore) persists completed messages — every user, assistant, and tool message gets appended to the store as it's produced. So the conversation history survives restarts (if you use JsonlSessionStore). When a session is loaded via createSession(id), it rebuilds _messages from the stored entries.

###What is not persisted

The steering/queuing state is entirely held in two plain arrays on the Session instance:

private steeringQueue: ModelMessage[] = [];
private followUpQueue: ModelMessage[] = [];

Along with:

  • private completion: Deferred | null — the running loop's promise
  • private controller: AbortController | null — the abort handle

None of this is saved anywhere. On settle() (loop end or error), both queues are wiped:

this.steeringQueue.length = 0;
this.followUpQueue.length = 0;

###So yes, the implications are:

  1. Crash = lost queued messages. If the process dies while the loop is running, any messages sitting in steeringQueue or followUpQueue that haven't been drained yet are gone. They were never persisted.

  2. Serverless is a poor fit for mid-turn steering. In a serverless/edge function, the Session object lives only for the duration of that invocation. You can't call session.send("First"), return the response, then in a separate invocation call session.steer("Change course") — the Session instance (and its loop, queues, and AbortController) no longer exists.

  3. The loop itself is a long-lived while loop with async for await. It's designed around a persistent process — the loop() generator in src/loop.ts runs a while (step < maxSteps) loop that yields events. This is fundamentally tied to a single execution context.

###What would work across restarts

You could restore the conversation history by reloading a session from the JsonlSessionStore and sending a new message — that effectively starts a fresh loop with the full prior context. But there's no mechanism to resume a mid-turn loop or replay queued steering/follow-up messages that hadn't been consumed yet.

In short: this is an in-process, long-lived agent loop design. Steering and queuing are coordination primitives for a concurrent caller within the same process (e.g., a TUI sending keystrokes while the agent is working), not a durable task queue.


END OF POST

How does steering and queuing work? — martinsione/agentlayer