In Part 2, we diagnosed a fundamental timing problem: Middy's after hook fires before the stream body is consumed by the Lambda runtime. The LLM data we need to publish simply does not exist yet when the hook runs. No amount of clever async orchestration can fix this. The data is not late; it has not been produced.
That diagnosis led us to a core insight: observability must follow data, not lifecycle. HTTP middleware frameworks model request handling as a sequence of phases: before, handler, after. That model works when the response is a finished value. It breaks when the response is a stream that has not been read yet.
So we stop relying on lifecycle hooks for publication. Instead, we attach observability to the data pipeline itself.
The mechanism is a TransformStream whose flush() callback fires at exactly the right moment: after every byte has flowed through, right before the stream signals completion to the consumer. This is not an implementation detail we stumbled into. It is a guarantee defined in the WHATWG Streams Standard.
TL;DR: Use
TransformStream.flush()to defer observability work until after a stream completes. The WHATWG Streams spec guaranteesflush()fires only after all chunks have passed through. We build two layers: AI SDK middlewares (wrapStream) collect data during streaming, and a Middy middleware wraps the HTTP response with a finalTransformStreamthat publishes everything onflush(). An in-memory store bridges the two middleware systems (we call this the Dual Middleware Bridge pattern).
How does TransformStream flush() guarantee deferred execution?
The Web Streams API ships a TransformStream class that sits between a readable and a writable side. You give it a transformer object with up to three callbacks:
-
transform(chunk, controller): called for every chunk that enters the writable side. You process the chunk (or pass it through unchanged) and enqueue it to the readable side viacontroller.enqueue(). -
flush(controller): called once, after the writable side closes successfully and everytransform()promise has resolved. -
cancel(reason): called if the readable side is cancelled by the consumer.
The flush() contract comes directly from the WHATWG Streams Standard:
flush()is called only after all calls totransform()have completed successfully and the writable side has been closed.
When does flush() not fire?
| Scenario |
flush() called? |
|---|---|
| All chunks processed, stream closed normally | Yes |
transform() rejects (throws) |
No |
| Writable side is aborted | No |
| Readable side is cancelled (e.g., client disconnect) | No. cancel() fires instead |
This is exactly the semantic we need. flush() gives us a guaranteed callback that fires once, after all data has passed through, and only if the stream completed successfully.
If the client disconnects and the stream aborts, flush() never fires, which is correct because partial LLM output is not worth publishing.
The createStreamTransform Utility
We start with a small, generic utility. All the code in this post comes from step-3 of the companion repo.
// lambda/utils/create-stream-transform.ts
export const createStreamTransform = <T>(
stream: ReadableStream<T>,
onFlush: (chunks: Array<T>) => void | Promise<void>,
): ReadableStream<T> => {
const chunks: Array<T> = [];
return stream.pipeThrough(
new TransformStream<T, T>({
transform(chunk: T, controller): void {
chunks.push(chunk);
controller.enqueue(chunk);
},
async flush(): Promise<void> {
try {
await onFlush(chunks);
} catch {
// Swallow errors: observability failures must never
// break the stream pipeline reaching the client.
}
},
}),
);
};
This is a tap: it observes the stream without modifying it. Every chunk passes through to the next stage unchanged, while being accumulated in an internal array. When the stream completes, onFlush receives the full array of chunks.
The try/catch in flush() is critical. If onFlush throws (say, an SNS publish fails or a CloudWatch API call times out), the error is swallowed.
If we let it propagate, the WHATWG spec says the readable side enters an error state, which would break the stream for the client. The rule is simple: observability failures must never become user-facing failures.
Note the generic type parameter <T>. This utility works with LanguageModelV3StreamPart (AI SDK's internal stream type) and Uint8Array (the serialized HTTP response bytes) alike. We will use it at both levels.
Two Middleware Systems
Our architecture involves two independent middleware systems that need to cooperate:
AI SDK middleware (
LanguageModelMiddlewarewithwrapStream): operates on the raw LLM stream. Each chunk is aLanguageModelV3StreamPart: a typed union of token deltas, usage metadata, finish reasons, and so on.Middy middleware: operates on the HTTP response after
toTextStreamResponse()has serialized the AI SDK stream into aReadableStream<Uint8Array>. At this level, chunks are raw bytes.
These two systems run at different layers and at different times. The AI SDK middlewares wrap the model before streamText() is called. The Middy middleware wraps the response body after the handler returns.
They need a bridge.
The bridge is an in-memory store: a simple Map that the AI SDK middleware writes to (during its flush()) and the Middy middleware reads from (during its own flush()):
// lambda/utils/log-data-store.ts
export type LlmLogData =
| {
llmParam: Record<PropertyKey, unknown>;
llmResult: Record<PropertyKey, unknown>;
}
| undefined;
export type LlmLogDataStore = {
clear: () => void;
get: () => LlmLogData;
set: (data: LlmLogData) => void;
};
export const createLlmLogDataStore = (): LlmLogDataStore => {
const storage = new Map<string, LlmLogData>();
const KEY = 'llmLogData';
return {
clear: (): void => { storage.clear(); },
get: (): LlmLogData => storage.get(KEY),
set: (data: LlmLogData): void => { storage.set(KEY, data); },
};
};
export const llmLogDataStore = createLlmLogDataStore();
A Map is safe here because Lambda is single-threaded. One request at a time, one reader and one writer, no race conditions. The before hook clears the store at the start of each invocation.
AI SDK Middlewares
We build two AI SDK middlewares, both using createStreamTransform.
LoggingMiddleware
Logs model response metadata (chunk count, model ID) when the stream completes:
// lambda/ai-middleware/logging-middleware.ts
import type { LanguageModelMiddleware } from 'ai';
import { createStreamTransform } from '../utils/create-stream-transform';
type Logger = {
info: (...args: Array<unknown>) => void;
};
export const LoggingMiddleware = (logger: Logger): LanguageModelMiddleware => ({
specificationVersion: 'v3',
wrapGenerate: async ({ doGenerate }) => {
const result = await doGenerate();
logger.info('LLM call completed', {
usage: result.usage,
modelId: result.response?.modelId,
});
return result;
},
wrapStream: async ({ doStream }) => {
const result = await doStream();
return {
...result,
stream: createStreamTransform(result.stream, (streamParts) => {
logger.info('LLM streaming call completed', {
chunkCount: streamParts.length,
});
}),
};
},
});
DataStoreMiddleware
Captures both the input parameters and the streaming result, then writes them to the shared store:
// lambda/ai-middleware/data-store-middleware.ts
import type { LanguageModelMiddleware } from 'ai';
import { createStreamTransform } from '../utils/create-stream-transform';
import type { LlmLogData } from '../utils/log-data-store';
export const DataStoreMiddleware = (
setData: (data: LlmLogData) => void,
): LanguageModelMiddleware => ({
specificationVersion: 'v3',
wrapGenerate: async ({ doGenerate, params }) => {
const result = await doGenerate();
setData({
llmParam: params as Record<PropertyKey, unknown>,
llmResult: result as Record<PropertyKey, unknown>,
});
return result;
},
wrapStream: async ({ doStream, params }) => {
const result = await doStream();
return {
...result,
stream: createStreamTransform(result.stream, (streamParts) => {
setData({
llmParam: params as Record<PropertyKey, unknown>,
llmResult: {
request: result.request,
response: result.response,
streamParts,
},
});
}),
};
},
});
Both middlewares follow the same shape. In wrapGenerate, the data is available synchronously after doGenerate() resolves, so no TransformStream is needed.
In wrapStream, data only becomes available after the entire stream is consumed, so we use createStreamTransform to defer our work to flush().
The middlewares are composed onto the model via wrapLanguageModel():
const aiMiddlewares = [
LoggingMiddleware(logger),
DataStoreMiddleware(llmLogDataStore.set),
];
const model = wrapLanguageModel({
model: baseModel,
middleware: aiMiddlewares,
});
How do you publish observability data after a stream completes?
This is the Middy middleware that actually solves the timing problem from Part 2. It is a dual-path middleware: it handles both streaming and non-streaming responses.
// lambda/middleware/publish-log.ts
import type middy from '@middy/core';
import type { LlmLogDataStore } from '../utils/log-data-store';
type StreamResponse = {
body: ReadableStream<Uint8Array>;
headers: Record<string, string>;
statusCode: number;
};
type Logger = {
info: (...args: Array<unknown>) => void;
error: (...args: Array<unknown>) => void;
};
const isStreamResponse = (response: unknown): response is StreamResponse =>
typeof response === 'object' &&
response !== null &&
'body' in response &&
(response as StreamResponse).body instanceof ReadableStream;
export const publishLog = (
llmLogDataStore: LlmLogDataStore,
logger: Logger,
): middy.MiddlewareObj => {
const publishImmediately = async (): Promise<void> => {
const data = llmLogDataStore.get();
if (!data) return;
try {
logger.info('LLM log data (non-streaming)', { data });
} catch (error) {
logger.error('Failed to publish LLM log data', { error });
} finally {
llmLogDataStore.clear();
}
};
const publishAfterStreamConsumption = (request: middy.Request): void => {
const response = request.response as StreamResponse;
const publishTransform = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk: Uint8Array, controller): void {
controller.enqueue(chunk);
},
async flush(): Promise<void> {
try {
const data = llmLogDataStore.get();
if (!data) return;
logger.info('LLM log data (streaming)', { data });
} catch (error) {
logger.error('Failed to publish LLM log data', { error });
} finally {
llmLogDataStore.clear();
}
},
});
(request.response as StreamResponse).body =
response.body.pipeThrough(publishTransform);
};
return {
before: (): void => {
llmLogDataStore.clear();
},
after: async (request: middy.Request): Promise<void> => {
if (isStreamResponse(request.response)) {
publishAfterStreamConsumption(request);
return;
}
await publishImmediately();
},
onError: async (): Promise<void> => {
await publishImmediately();
},
};
};
Read the after hook carefully. For streaming responses, it does not publish. It wraps the response body with a new TransformStream.
The actual publication happens inside that TransformStream's flush(), which fires later, after the Lambda runtime has consumed every byte.
For non-streaming responses (e.g., a handler that uses generateObject instead of streamText), the data is already available, so we publish immediately. The isStreamResponse type guard decides which path to take.
The before hook clears the store at the start of each invocation.
The onError hook publishes whatever data exists if the handler throws before streaming starts. Partial data can still be valuable for debugging failures.
The Complete Pipeline
When the Lambda runtime consumes the response body, data flows through a chain of TransformStreams:
streamText()
-> AI SDK stream (LanguageModelV3StreamPart)
-> LoggingMiddleware.wrapStream (flush: logs completion)
-> DataStoreMiddleware.wrapStream (flush: stores data in Map)
-> toTextStreamResponse()
-> ReadableStream<Uint8Array>
-> publishLog TransformStream (flush: reads Map, publishes)
-> Middy pipes to Lambda responseStream
-> API Gateway streams to client
Here is the same pipeline as a visual diagram:
flowchart LR
A["streamText()"] --> B["LoggingMiddleware\n(TransformStream)"]
B --> C["DataStoreMiddleware\n(TransformStream)"]
C --> D["toTextStreamResponse()"]
D --> E["publishLog\n(TransformStream)"]
E --> F["Lambda\nresponseStream"]
F --> G["API Gateway\nā Client"]
B -. "flush: log completion" .-> B
C -. "flush: store data" .-> C
E -. "flush: publish log" .-> E
style B fill:#e1f5fe
style C fill:#e1f5fe
style E fill:#fff3e0
The three colored boxes are TransformStream instances. The blue ones are AI SDK middlewares operating on LanguageModelV3StreamPart chunks. The orange one is the Middy middleware operating on Uint8Array bytes after toTextStreamResponse() serialization.
The flush() callbacks execute in pipeline order. This is guaranteed by how pipeThrough chains work: the upstream TransformStream must close its readable side before the downstream TransformStream's writable side closes, which means upstream flush() completes before downstream flush() is invoked.
The sequence:
- LoggingMiddleware.flush(): logs the LLM streaming result to CloudWatch.
-
DataStoreMiddleware.flush(): writes
llmParamandllmResultinto the in-memory store. - publishLog.flush(): reads from the store and publishes to the observability backend (SNS, Kinesis, CloudWatch, or whatever you need).
Step 2 sets the data. Step 3 reads it. This ordering is not a coincidence we hope holds. It is a structural guarantee of pipeThrough chains in the WHATWG spec.
Upstream flushes complete before downstream flushes begin.
Here is how the handler wires everything together:
// lambda/index.ts
const aiMiddlewares = [
LoggingMiddleware(logger),
DataStoreMiddleware(llmLogDataStore.set),
];
const streamHandler = async (
event: APIGatewayProxyEvent,
): Promise<HttpStreamResponse> => {
const body = JSON.parse(event.body ?? '{}');
const prompt: string = body.prompt ?? 'Summarize the benefits of serverless architecture';
const result = streamingService({ prompt }, onError, aiMiddlewares);
const response = result.toTextStreamResponse();
return {
body: response.body ?? '',
headers: Object.fromEntries(response.headers.entries()),
statusCode: response.status,
};
};
export const handler = middy<APIGatewayProxyEvent, HttpStreamResponse>({
streamifyResponse: true,
})
.use(httpCors({ origins: ['*'] }))
.use(publishLog(llmLogDataStore, logger))
.handler(streamHandler);
Nothing in this handler blocks on observability. The stream starts flowing to the client immediately. The TransformStreams execute their flush() callbacks transparently as part of the pipe chain, after every byte has been delivered.
Why is TransformStream flush() better than AI SDK onFinish?
AI SDK provides an onFinish callback in streamText(). You might wonder why we did not just use that.
The problem is that onFinish is fire-and-forget. The AI SDK calls your callback but does not await the returned promise. From the AI SDK source:
// Simplified from AI SDK internals
onFinish?.({ usage, object, ... }); // No await
This means:
- If your
onFinishhandler does async work (publishing to SNS, writing to a database), the Lambda runtime may freeze the execution context before that work completes. - There is no backpressure. The stream closes and the response ends regardless of whether your callback finished.
- There is no error propagation. If
onFinishthrows, nobody catches it.
Our flush() approach is fundamentally different:
-
flush()is part of the pipeline. It executes as a step in the pipe chain. The downstream consumer does not see the stream as "closed" untilflush()resolves. The Lambda runtime will not freeze the context whileflush()is awaiting an SNS publish. -
flush()only fires on success. If the stream aborts (client disconnect, timeout),flush()never runs. We do not publish partial, potentially misleading data. -
flush()ordering is deterministic. In a chain ofpipeThroughcalls, upstream flushes complete before downstream flushes begin. We rely on this to ensure the data store is populated before we read from it.
The difference is not subtle. onFinish is a notification. flush() is a pipeline stage. One is best-effort. The other is structurally guaranteed.
Recap
The pattern we built in this post can be summarized in one sentence: attach observability to the data pipeline using TransformStream flush(), instead of relying on middleware lifecycle hooks that fire at the wrong time.
Three properties make this work:
WHATWG
flush()guarantee: fires once, after alltransform()promises resolve and the writable side closes successfully. This is not a Node.js quirk. It is a web standard implemented consistently across runtimes.Pipeline ordering: in a
pipeThroughchain, upstreamflush()completes before downstreamflush()begins. This lets us write data in one TransformStream and read it in another, with guaranteed ordering.Error isolation:
try/catchinsideflush()ensures observability failures never break the user-facing stream. The client gets their complete response regardless of whether our publish succeeded.
The createStreamTransform utility is 24 lines of code. The publishLog middleware is under 90. Together, they solve a problem that has no clean solution in traditional middleware architectures.
What's Next
We have the pattern. We have the code. But how do we know it actually works? How do we verify that flush() fires in the right order, that errors are properly swallowed, that aborted streams skip publication?
In Part 4, we put it all together: a complete CDK stack you can deploy in 5 minutes, with tests that verify the TransformStream timing guarantees.
We will write unit tests that prove the flush ordering, integration tests that confirm end-to-end observability, and cover the edge cases: client disconnects, LLM timeouts, and publish failures.
References
-
WHATWG Streams Standard: TransformStream: the specification that guarantees
flush()behavior - MDN: TransformStream: API reference
- MDN: ReadableStream.pipeThrough(): chaining TransformStreams
- Node.js Web Streams API: Node.js 22 implementation
-
AI SDK Middleware:
LanguageModelMiddlewarewithwrapStream -
AI SDK
wrapLanguageModel: composing model with middlewares - Cloudflare Workers TransformStream: validation of the "tap" pattern in production
- New Relic: Serverless response streaming with AI observability: similar generator proxy pattern
-
Is
onFinishfire-and-forget?: Vercel community confirmation
Top comments (0)