How to implement message interruptions with stop()?

HEAD
c123363Version Packages (#12611)
This post might have stale content, as HEAD is 241 commits ahead.

how do i implement message interruptions? there's a "stop" from the @ai-sdk/react, but that doesn't auto-stop the stream server side, no? i mean it aborts the signal maybe, but the server might get "one more part" done, and the stop() is instant client side. do we accept that "one more part might come", or does the ai sdk provide a recommendation for this?

Avatar of anthropic/claude-opus-4.6
Claude Opus 4.6
commented

Excellent. I now have a comprehensive understanding of the full flow. Here's the answer:


##How Interruptions Work End-to-End

###Client Side: stop() is instant

When you call stop() from useChat, here's exactly what happens (packages/ai/src/ui/chat.ts:552-558):

stop = async () => {
  if (this.status !== 'streaming' && this.status !== 'submitted') return;
  if (this.activeResponse?.abortController) {
    this.activeResponse.abortController.abort();
  }
};

This triggers two things:

  1. The fetch is aborted — the abortController.signal is passed directly as signal: abortSignal to the fetch() call in HttpChatTransport.sendMessages() (line ~213). This tears down the HTTP connection from the client.
  2. The client immediately transitions to 'ready' — the catch block sees isAbort or an AbortError and sets status to 'ready' without treating it as an error (chat.ts:684-689).

The key detail: whatever tokens have already been written to the client state are kept. The onFinish callback fires in the finally block with isAbort: true and the message as it was at the moment of abortion.

###Does the server actually stop?

It depends on whether you pass abortSignal to streamText on the server. The AI SDK does not magically wire this up — you have to do it yourself.

The canonical pattern from examples/next-openai/app/api/chat/route.ts is:

export async function POST(req: Request) {
  const { messages } = await req.json();

  const result = streamText({
    model: openai('gpt-4o'),
    messages: await convertToModelMessages(messages),
    abortSignal: req.signal, // ← THIS is what propagates client abort to the server
  });

  return result.toUIMessageStreamResponse({
    consumeSseStream: consumeStream, // ← needed for correct abort handling
  });
}

When the client calls stop()fetch is aborted → the HTTP connection closes → req.signal fires abortstreamText sees the abort signal and:

  1. Stops pulling from the LLM provider stream
  2. Emits an { type: 'abort' } chunk on its fullStream (stream-text.ts:1064-1077)
  3. Calls the onAbort callback if provided
  4. Closes the stream

###The "one more part" problem — you're right

You're correct that there's an inherent race condition. Here's why:

  1. The abort signal on the server is checked between pulls from the underlying LLM stream (stream-text.ts:1087-1089):

    if (abortSignal?.aborted) {
      abort();
      return;
    }
    

    If a chunk was already being processed when the abort fires, it completes and is enqueued before the next pull checks the signal.

  2. On the network layer, there may be buffered SSE data in flight between server and client.

  3. On the client side, the fetch abort is immediate — the client stops reading from the response body. Any server-sent data that was in-flight or buffered is simply discarded by the browser.

###So the SDK's approach is: keep what you have, discard the rest

The client keeps whatever tokens have been written to the StreamingUIMessageState at the moment abort() is called. The onFinish callback fires with isAbort: true, and the message state reflects what was accumulated up to that point. There's no attempt to "undo" partial data.

###What about the server continuing after client disconnect?

If you don't pass req.signal (or an equivalent abort signal) to streamText, the server will happily continue generating and streaming into a broken pipe until the LLM call finishes. This wastes tokens and compute.

The more advanced pattern from examples/next/app/api/chat/route.ts shows an approach where server-side cancellation is driven by a database flag rather than the HTTP connection:

const userStopSignal = new AbortController();

const result = streamText({
  model: 'openai/gpt-5-mini',
  messages: await convertToModelMessages(messages),
  abortSignal: userStopSignal.signal,
  onChunk: throttle(async () => {
    const { canceledAt } = await readChat(id);
    if (canceledAt) {
      userStopSignal.abort();
    }
  }, 1000),
  onAbort: () => {
    console.log('aborted');
  },
});

This is useful when the server infrastructure (e.g., serverless functions, resumable streams) means req.signal isn't reliable, or when you want cancellation to be persisted/shared across reconnects.

###TL;DR Recommendations

ConcernWhat the SDK does
Client stop()Instantly aborts fetch, keeps accumulated tokens, sets status to 'ready', fires onFinish with isAbort: true
Server stops generating?Only if you pass abortSignal: req.signal (or your own controller) to streamText
"One more part" in flight?Yes, it's possible. The client discards it — whatever was already written to state is kept as-is
consumeSseStream: consumeStreamNeeded on toUIMessageStreamResponse() for correct server-side abort detection (ensures the stream is consumed so abort propagates properly)
Server-side onAbort callbackAvailable on streamText to react to aborts (cleanup, logging, etc.)

The SDK accepts the "one more part might come" reality — it doesn't try to guarantee byte-perfect synchronization. The design is intentionally optimistic: keep what you have, stop reading, and let both sides clean up independently.

Avatar of julianbenegas
Julian Benegas
commented

The more advanced pattern from examples/next/app/api/chat/route.ts shows an approach where server-side cancellation is driven by a database flag rather than the HTTP connection:

good approach. but the timestamp might have issues/races as well, right? is a better metric, a "cancelledAtPartIndex" or something like that? something that the client can know "where did i stop" and send to the server for it to slice parts as needed...

Avatar of anthropic/claude-opus-4.6
Claude Opus 4.6
commented

Now I have a full picture. Let me form the answer.


You're right to be skeptical of the canceledAt timestamp. Let me break down the issues.

##The canceledAt timestamp approach in the example

Looking at the examples/next app, here's the full flow:

  1. Client sends a DELETE /api/chat/{id}/stream which writes canceledAt: Date.now() to the store (examples/next/app/api/chat/[id]/stream/route.ts:38)
  2. Server polls in onChunk (throttled to once per second!) and aborts when it sees that flag (examples/next/app/api/chat/route.ts:66-70)

The race windows here are real:

  • The onChunk callback is throttled to 1 second — so the server could generate up to ~1 second of additional tokens before it notices the cancel
  • There's no correlation between what the client saw and what the server committed — the canceledAt timestamp isn't used to reconcile state at all, it's purely a boolean trigger
  • The onFinish callback just saves whatever messages state resulted, including whatever extra parts got generated after the client visually stopped

##Does the SDK have anything like a "part index"?

No. Looking at the chunk types in ui-message-chunks.ts, there's no sequential counter or stream position. Chunks have:

  • type (e.g. text-delta, tool-input-start, etc.)
  • Some have id fields (text parts, tool calls, sources) — but these are content IDs, not sequence numbers
  • No global monotonic counter

On the client side, UIMessage.parts is just an array — its .length is the closest thing to an index, but it's not something the protocol tracks.

##Would a cancelledAtPartIndex be better?

It would be more precise than a timestamp, but there's a deeper architectural issue: the client's parts array and the server's stream chunks don't have a 1:1 mapping. A single "text part" on the client is built up from many text-starttext-deltatext-deltatext-end chunks. So the granularity mismatch makes it tricky.

If you wanted to build this properly, you'd probably need to:

  1. Assign a monotonic sequence number to each SSE event on the server (not something the SDK does today)
  2. Track the last sequence number the client processed before calling stop
  3. Send that number back with the cancel request
  4. On the server, use the onFinish to trim the saved message state to match

But honestly, for most use cases the current approach works fine because:

  • The onFinish on the server already receives the full message state — whatever got generated is what gets persisted. The client may have discarded the trailing parts, but the next time the client loads the chat from the DB, it'll see those extra tokens. This is arguably a feature — you don't lose generated content.
  • If you truly want the client's view to be authoritative, you could have the client send its final message state (or its message.parts.length) in the cancel request and have the server trim accordingly. But this creates a different problem — now you might lose content the user wanted.

The SDK's design philosophy is clearly: the server's state is the source of truth, and the client just stops reading early. The slight mismatch between "what the client displayed at stop-time" vs "what the server saved" is treated as acceptable, not as a bug to be solved at the protocol level.

Avatar of julianbenegas
Julian Benegas
commented

what i was thinking was the client sends its message.parts.length, which corresponds to UIMessage parts. i do my persistence in onFinish, and i think i have ui message parts at that point, right? toUIMessageStream, then onFinish.

but still, i may be storing parts that are not yet "done", right? (like tool calls). ah, but still, the index might match...

this seems like the way to go

Avatar of anthropic/claude-opus-4.6
Claude Opus 4.6
commented

Now I have everything. Let me give a precise answer.


Yes, this approach works. Here's why:

##The parts arrays are guaranteed to match

Both the client and server build UIMessage.parts using the exact same function: processUIMessageStream from packages/ai/src/ui/process-ui-message-stream.ts. It's used:

Same stream chunks → same function → same parts array structure. The ordering of push calls is deterministic — text parts, reasoning parts, tool invocations, step-starts, files, sources all get pushed in stream order.

So message.parts.length on the client at abort time will correspond to the same index boundary in the server's messages[].parts array.

##The "not done" parts concern

You're right that a part might be mid-stream — e.g., a text part with state: 'streaming' or a tool call that hasn't received its output yet. But this doesn't break the index approach. The part is already pushed to the array when it starts:

// text-start → pushes a new TextUIPart immediately (line 313)
state.message.parts.push(textPart);  // state: 'streaming'

// subsequent text-delta chunks just mutate textPart.text in place
// text-end sets textPart.state = 'done'

So by the time the client sees any content from a part, it's already in the parts array. The index is stable from the moment a part begins — deltas don't add new entries, they mutate the existing one.

##The approach

Client stop() → sends { cancelledAtPartCount: message.parts.length } to server
Server onFinish → gets messages with parts → slices parts to cancelledAtPartCount

The one subtlety: the server might have pushed one or two more parts between the client's snapshot and the abort propagating. But that's exactly what the index fixes — you trim to the client's known boundary. The trailing parts that the client never rendered get dropped.

The only edge case to watch: if the last part at the boundary is a text part that was still state: 'streaming', the server's version of that same part will have more text accumulated than what the client displayed. The index matches, but the content within that part might be slightly longer server-side. If you want pixel-perfect fidelity, you'd also need to send the text length of the last text part (or just accept the extra few tokens — they're usually fine).

Avatar of julianbenegas
Julian Benegas
commented

ah, but that wouldn't work well for text parts. we'd want chunk/token granularity... can it be done in that case?

Avatar of anthropic/claude-opus-4.6
Claude Opus 4.6
commented

Good, I have the full picture now. Let me summarize what accumulates in-place:


Yeah, this is where it gets tricky. The SDK has no chunk-level sequence counter in the protocol — there's no monotonic index on SSE events, no Last-Event-ID, nothing like that.

The parts that accumulate content in-place via deltas are:

Part typeHow it growsIdentifier
TexttextPart.text += chunk.deltachunk.id (from text-start)
ReasoningreasoningPart.text += chunk.deltachunk.id (from reasoning-start)
Tool inputpartialToolCall.text += chunk.inputTextDeltachunk.toolCallId

All three follow the same pattern: a -start chunk pushes a part, then -delta chunks mutate it in place, then an -end/-available chunk finalizes it. The parts index doesn't change during deltas.

So if you wanted true token-granularity, the client would need to send something like:

{
  partsCount: message.parts.length,
  // for each streaming part, how far it got:
  streamingPartLengths: {
    [textId]: textPart.text.length,
    [reasoningId]: reasoningPart.text.length,
    [toolCallId]: partialToolCall.text.length,
  }
}

And then the server's onFinish would need to truncate each part's accumulated text to match. This is all doable, but entirely userland — nothing in the SDK helps you here. You'd be building a reconciliation layer on top.

In practice, the question is whether it's worth it. The gap between "what the client saw" and "what the server persisted" is typically a few tokens from the last in-flight chunk. If you just persist what the server has and the user reloads, they'll see a slightly longer response than what they saw at stop-time — usually a partial sentence fragment at most. For most chat UIs that's a non-issue, and arguably better than truncating mid-word.

Avatar of julianbenegas
Julian Benegas
commented

This is all doable, but entirely userland

how could this be done robustly? imagine i'm using useChat in the client, and the toUiMessageStream + onFinish in the backend.

In practice, the question is whether it's worth it

that's not something for you to judge. do your job.

Avatar of anthropic/claude-opus-4.6
Claude Opus 4.6
commented

Good. Now I have the full picture. Let me compose the answer.


Here's a robust approach. The core idea: the client is the source of truth for what it displayed, so it captures a snapshot at stop-time and sends it to the server, which uses it to trim the persisted message.

##The snapshot the client needs to capture

At the moment of stop(), the client's message.parts array has everything it rendered. For each part that was still streaming, you need to capture how much content was accumulated. Here's what can be mid-stream:

From process-ui-message-stream.ts:

  • Text parts: textPart.text += chunk.delta (line 331) — grows character by character
  • Reasoning parts: reasoningPart.text += chunk.delta (line 380) — same pattern
  • Tool input: partialToolCall.text += chunk.inputTextDelta (line 496) — but this is raw JSON text, and the part stores a parsed input object. So the relevant measure here is partialToolCall.text.length, but the UIMessage part only has the parsed input.

For text and reasoning, part.text.length is a clean cursor. For tool calls, the input is being incrementally JSON-parsed, so length isn't a clean boundary — but tool calls mid-input are incomplete anyway and would typically be discarded or re-run.

##Implementation

###Client side

const { messages, stop } = useChat({
  id: chatData.id,
  onFinish({ message, isAbort }) {
    if (isAbort) {
      // Build the snapshot: part count + text lengths for streaming parts
      const snapshot = {
        partsCount: message.parts.length,
        textCursors: message.parts
          .map((part, i) => {
            if (part.type === 'text') return { index: i, length: part.text.length };
            if (part.type === 'reasoning') return { index: i, length: part.text.length };
            return null;
          })
          .filter(Boolean),
      };

      fetch(`/api/chat/${chatData.id}/cancel`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ snapshot }),
      });
    }
  },
});

The key: onFinish on the client fires in the finally block of chat.ts:708-719 with message: this.activeResponse!.state.message — that's the exact UIMessage the client had built up at abort time, using the same processUIMessageStream as the server.

###Server side — cancel endpoint

// /api/chat/[id]/cancel/route.ts
export async function POST(req: Request, { params }) {
  const { id } = await params;
  const { snapshot } = await req.json();
  await saveChat({ id, cancelSnapshot: snapshot });
  return new Response(null, { status: 200 });
}

###Server side — stream route with reconciliation

// /api/chat/route.ts
const userStopSignal = new AbortController();

const result = streamText({
  model: openai('gpt-4o'),
  messages: await convertToModelMessages(messages),
  abortSignal: userStopSignal.signal,
  onChunk: throttle(async () => {
    const chat = await readChat(id);
    if (chat.cancelSnapshot) {
      userStopSignal.abort();
    }
  }, 1000),
});

return result.toUIMessageStreamResponse({
  originalMessages: messages,
  generateMessageId: generateId,
  onFinish: async ({ messages, responseMessage, isAborted }) => {
    if (isAborted) {
      const { cancelSnapshot } = await readChat(id);

      if (cancelSnapshot) {
        // Trim the response message to match the client's snapshot
        const trimmed = trimToSnapshot(responseMessage, cancelSnapshot);
        const finalMessages = [...messages.slice(0, -1), trimmed];
        await saveChat({ id, messages: finalMessages, cancelSnapshot: null });
        return;
      }
    }
    await saveChat({ id, messages });
  },
  consumeSseStream: consumeStream,
});

###The trimming function

function trimToSnapshot(message, snapshot) {
  // Clone to avoid mutation
  const trimmed = structuredClone(message);

  // Drop any parts the client never saw
  trimmed.parts = trimmed.parts.slice(0, snapshot.partsCount);

  // For each text/reasoning cursor, trim the content
  for (const cursor of snapshot.textCursors) {
    const part = trimmed.parts[cursor.index];
    if (part && ('text' in part)) {
      part.text = part.text.slice(0, cursor.length);
    }
  }

  // Mark any still-streaming parts as done
  for (const part of trimmed.parts) {
    if ('state' in part && part.state === 'streaming') {
      part.state = 'done';
    }
  }

  return trimmed;
}

##The race condition and why it's fine

There's a race between:

  1. Server onFinish — fires when the consumeSseStream finishes draining (since tee() keeps the second branch alive even after client disconnect)
  2. Client onFinish → cancel POST — fires in the client's finally block

If the server's onFinish fires before the cancel POST arrives, it saves the untrimmed message. Then the cancel POST arrives and... the snapshot is written but never applied.

To handle this robustly, you have two options:

Option A: Two-pass reconciliation. The cancel endpoint itself does the trimming if the message was already saved:

// /api/chat/[id]/cancel/route.ts
export async function POST(req, { params }) {
  const { id } = await params;
  const { snapshot } = await req.json();
  const chat = await readChat(id);

  if (chat.activeStreamId == null) {
    // Stream already finished and onFinish already saved.
    // Trim the already-persisted message.
    const lastMsg = chat.messages[chat.messages.length - 1];
    if (lastMsg?.role === 'assistant') {
      const trimmed = trimToSnapshot(lastMsg, snapshot);
      chat.messages[chat.messages.length - 1] = trimmed;
      await saveChat({ id, messages: chat.messages });
    }
  } else {
    // Stream still active — save snapshot for onFinish to pick up
    await saveChat({ id, cancelSnapshot: snapshot });
  }

  return new Response(null, { status: 200 });
}

Option B: Only persist in the cancel endpoint. Don't save in onFinish at all when aborted — let the cancel POST be the sole writer. Simpler, but means if the cancel POST never arrives (client crashes, network dies), you lose the message.

##What this doesn't cover perfectly

Tool call inputs. The UIMessage part stores parsed input (an object), not raw text length. So if a tool call was mid-streaming when the client stopped, you can't do character-level trimming — the client's part.input is whatever parsePartialJson produced from the partial accumulator at that moment. You could send the serialized JSON.stringify(part.input).length as a cursor, but since the server and client both parsed the same partial JSON, the inputs should actually match at the part level — the issue is only if extra tool-input-delta chunks arrived server-side after the client stopped. In practice, truncating at partsCount handles this: if the tool call part exists in both, the input object will be at least as complete on the server, and you'd just keep the server's version (slightly more complete) or compare JSON.stringify lengths.

Interleaved parts. If multiple text parts are streaming simultaneously (parallel text streams with different ids), you need cursors for each one. The textCursors array above handles this — each entry is keyed by part index.

Avatar of julianbenegas
Julian Benegas
commented

what if the client sent everything? like the full last part? so like lastPart: { index, part } or something. fully. so the server just fully slices + replaces

Avatar of anthropic/claude-opus-4.6
Claude Opus 4.6
commented

When abortSignal fires, the server-side stream enqueues an abort chunk and closes. The consumeSseStream (the tee'd branch) will drain through to that abort chunk. Then handleUIMessageStreamFinish's flush() fires, calling onFinish with whatever state.message has accumulated — which will be at least as much as what the client saw, and potentially more (since the server might have processed a chunk or two more before checking the signal).

So the server always has the client's parts. The client's snapshot is always a valid prefix. That's what makes the "send the full last part" approach clean.


Yeah, that works and it's way cleaner. The server always has a superset of what the client saw, so the client's snapshot is always a valid prefix to slice to. Here's the full approach:

##Client

const { messages, stop } = useChat({
  id: chatData.id,
  onFinish({ message, isAbort }) {
    if (!isAbort) return;

    const lastPartIndex = message.parts.length - 1;

    fetch(`/api/chat/${chatData.id}/cancel`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        snapshot: {
          partsCount: message.parts.length,
          lastPart: lastPartIndex >= 0
            ? { index: lastPartIndex, part: message.parts[lastPartIndex] }
            : null,
        },
      }),
    });
  },
});

Every UIMessagePart type is plain data — TextUIPart, ReasoningUIPart, ToolUIPart, DynamicToolUIPart, SourceUrlUIPart, SourceDocumentUIPart, FileUIPart, DataUIPart, StepStartUIPart — all serializable via JSON.stringify with no loss. No functions, no classes, no circular refs. You can verify in ui-messages.ts:75-87.

##Server — cancel endpoint

// /api/chat/[id]/cancel/route.ts
export async function POST(req: Request, { params }) {
  const { id } = await params;
  const { snapshot } = await req.json();

  const chat = await readChat(id);

  if (chat.activeStreamId == null) {
    // Stream already finished, onFinish already persisted. Trim in place.
    reconcileMessage(chat, snapshot);
    await saveChat(chat);
  } else {
    // Stream still active — stash snapshot for onFinish to pick up
    await saveChat({ ...chat, cancelSnapshot: snapshot });
  }

  return new Response(null, { status: 200 });
}

##Server — stream route

const userStopSignal = new AbortController();

const result = streamText({
  model: openai('gpt-4o'),
  messages: await convertToModelMessages(messages),
  abortSignal: userStopSignal.signal,
  onChunk: throttle(async () => {
    const chat = await readChat(id);
    if (chat.cancelSnapshot) {
      userStopSignal.abort();
    }
  }, 1000),
});

return result.toUIMessageStreamResponse({
  originalMessages: messages,
  generateMessageId: generateId,
  onFinish: async ({ messages, isAborted }) => {
    const chat = await readChat(id);

    if (isAborted && chat.cancelSnapshot) {
      reconcileMessage(
        { ...chat, messages },
        chat.cancelSnapshot,
      );
      await saveChat({
        id,
        messages,
        activeStreamId: null,
        cancelSnapshot: null,
      });
      return;
    }

    await saveChat({ id, messages, activeStreamId: null });
  },
  consumeSseStream: consumeStream,
});

##The reconcile function

This is the whole trick — just slice and replace:

function reconcileMessage(chat, snapshot) {
  const lastMsg = chat.messages[chat.messages.length - 1];
  if (!lastMsg || lastMsg.role !== 'assistant') return;

  // Slice to however many parts the client saw
  lastMsg.parts = lastMsg.parts.slice(0, snapshot.partsCount);

  // Replace the last part wholesale with the client's version
  if (snapshot.lastPart && snapshot.lastPart.index < lastMsg.parts.length) {
    lastMsg.parts[snapshot.lastPart.index] = snapshot.lastPart.part;
  }
}

That's it. slice drops any parts the server generated after the client stopped. The last part replacement handles the mid-stream text/reasoning case — the server's version of that part has more accumulated text, so you swap in the client's version which has exactly what was displayed.

##Why this is sound

  1. Server ≥ client, always. The server processes the full stream through the tee()'d consumeSseStream branch (create-ui-message-stream-response.ts:34-36), which keeps draining even after client disconnect. Its onFinish fires via flush() with the complete state.message. The client stopped reading earlier. So server.parts.length >= client.parts.length, and for any shared part at the same index, the server's version has ≥ content.

  2. Index stability. Parts are only ever push()'d in stream order by processUIMessageStream. Deltas mutate in-place, never insert. So part N on the client is the same logical part as part N on the server.

  3. The last part is the only one that can differ in content. All parts before it were either fully complete (got their -end chunk) or are identical because the same chunks built them. Only the final part might have extra deltas server-side.

  4. Both race orderings work. If onFinish fires first → saves full message → cancel POST arrives → reads it back → trims in place. If cancel POST arrives first → stashes snapshot → onFinish fires → reads snapshot → trims before saving. The cancel endpoint handles both cases.


END OF POST

How to implement message interruptions with stop()? — vercel/ai