DEV Community

Pablo Albaladejo
Pablo Albaladejo

Posted on

Deferred Observability for Streaming Lambda with TransformStream flush()

In Part 2, we diagnosed a fundamental timing problem: Middy's after hook fires before the stream body is consumed by the Lambda runtime. The LLM data we need to publish simply does not exist yet when the hook runs. No amount of clever async orchestration can fix this. The data is not late; it has not been produced.

That diagnosis led us to a core insight: observability must follow data, not lifecycle. HTTP middleware frameworks model request handling as a sequence of phases: before, handler, after. That model works when the response is a finished value. It breaks when the response is a stream that has not been read yet.

So we stop relying on lifecycle hooks for publication. Instead, we attach observability to the data pipeline itself.

The mechanism is a TransformStream whose flush() callback fires at exactly the right moment: after every byte has flowed through, right before the stream signals completion to the consumer. This is not an implementation detail we stumbled into. It is a guarantee defined in the WHATWG Streams Standard.

TL;DR: Use TransformStream.flush() to defer observability work until after a stream completes. The WHATWG Streams spec guarantees flush() fires only after all chunks have passed through. We build two layers: AI SDK middlewares (wrapStream) collect data during streaming, and a Middy middleware wraps the HTTP response with a final TransformStream that publishes everything on flush(). An in-memory store bridges the two middleware systems (we call this the Dual Middleware Bridge pattern).

How does TransformStream flush() guarantee deferred execution?

The Web Streams API ships a TransformStream class that sits between a readable and a writable side. You give it a transformer object with up to three callbacks:

  • transform(chunk, controller): called for every chunk that enters the writable side. You process the chunk (or pass it through unchanged) and enqueue it to the readable side via controller.enqueue().
  • flush(controller): called once, after the writable side closes successfully and every transform() promise has resolved.
  • cancel(reason): called if the readable side is cancelled by the consumer.

The flush() contract comes directly from the WHATWG Streams Standard:

flush() is called only after all calls to transform() have completed successfully and the writable side has been closed.

When does flush() not fire?

Scenario flush() called?
All chunks processed, stream closed normally Yes
transform() rejects (throws) No
Writable side is aborted No
Readable side is cancelled (e.g., client disconnect) No. cancel() fires instead

This is exactly the semantic we need. flush() gives us a guaranteed callback that fires once, after all data has passed through, and only if the stream completed successfully.

If the client disconnects and the stream aborts, flush() never fires, which is correct because partial LLM output is not worth publishing.

The createStreamTransform Utility

We start with a small, generic utility. All the code in this post comes from step-3 of the companion repo.

// lambda/utils/create-stream-transform.ts

export const createStreamTransform = <T>(
  stream: ReadableStream<T>,
  onFlush: (chunks: Array<T>) => void | Promise<void>,
): ReadableStream<T> => {
  const chunks: Array<T> = [];

  return stream.pipeThrough(
    new TransformStream<T, T>({
      transform(chunk: T, controller): void {
        chunks.push(chunk);
        controller.enqueue(chunk);
      },

      async flush(): Promise<void> {
        try {
          await onFlush(chunks);
        } catch {
          // Swallow errors: observability failures must never
          // break the stream pipeline reaching the client.
        }
      },
    }),
  );
};
Enter fullscreen mode Exit fullscreen mode

This is a tap: it observes the stream without modifying it. Every chunk passes through to the next stage unchanged, while being accumulated in an internal array. When the stream completes, onFlush receives the full array of chunks.

The try/catch in flush() is critical. If onFlush throws (say, an SNS publish fails or a CloudWatch API call times out), the error is swallowed.

If we let it propagate, the WHATWG spec says the readable side enters an error state, which would break the stream for the client. The rule is simple: observability failures must never become user-facing failures.

Note the generic type parameter <T>. This utility works with LanguageModelV3StreamPart (AI SDK's internal stream type) and Uint8Array (the serialized HTTP response bytes) alike. We will use it at both levels.

Two Middleware Systems

Our architecture involves two independent middleware systems that need to cooperate:

  1. AI SDK middleware (LanguageModelMiddleware with wrapStream): operates on the raw LLM stream. Each chunk is a LanguageModelV3StreamPart: a typed union of token deltas, usage metadata, finish reasons, and so on.

  2. Middy middleware: operates on the HTTP response after toTextStreamResponse() has serialized the AI SDK stream into a ReadableStream<Uint8Array>. At this level, chunks are raw bytes.

These two systems run at different layers and at different times. The AI SDK middlewares wrap the model before streamText() is called. The Middy middleware wraps the response body after the handler returns.

They need a bridge.

The bridge is an in-memory store: a simple Map that the AI SDK middleware writes to (during its flush()) and the Middy middleware reads from (during its own flush()):

// lambda/utils/log-data-store.ts

export type LlmLogData =
  | {
      llmParam: Record<PropertyKey, unknown>;
      llmResult: Record<PropertyKey, unknown>;
    }
  | undefined;

export type LlmLogDataStore = {
  clear: () => void;
  get: () => LlmLogData;
  set: (data: LlmLogData) => void;
};

export const createLlmLogDataStore = (): LlmLogDataStore => {
  const storage = new Map<string, LlmLogData>();
  const KEY = 'llmLogData';

  return {
    clear: (): void => { storage.clear(); },
    get: (): LlmLogData => storage.get(KEY),
    set: (data: LlmLogData): void => { storage.set(KEY, data); },
  };
};

export const llmLogDataStore = createLlmLogDataStore();
Enter fullscreen mode Exit fullscreen mode

A Map is safe here because Lambda is single-threaded. One request at a time, one reader and one writer, no race conditions. The before hook clears the store at the start of each invocation.

AI SDK Middlewares

We build two AI SDK middlewares, both using createStreamTransform.

LoggingMiddleware

Logs model response metadata (chunk count, model ID) when the stream completes:

// lambda/ai-middleware/logging-middleware.ts

import type { LanguageModelMiddleware } from 'ai';
import { createStreamTransform } from '../utils/create-stream-transform';

type Logger = {
  info: (...args: Array<unknown>) => void;
};

export const LoggingMiddleware = (logger: Logger): LanguageModelMiddleware => ({
  specificationVersion: 'v3',

  wrapGenerate: async ({ doGenerate }) => {
    const result = await doGenerate();
    logger.info('LLM call completed', {
      usage: result.usage,
      modelId: result.response?.modelId,
    });
    return result;
  },

  wrapStream: async ({ doStream }) => {
    const result = await doStream();

    return {
      ...result,
      stream: createStreamTransform(result.stream, (streamParts) => {
        logger.info('LLM streaming call completed', {
          chunkCount: streamParts.length,
        });
      }),
    };
  },
});
Enter fullscreen mode Exit fullscreen mode

DataStoreMiddleware

Captures both the input parameters and the streaming result, then writes them to the shared store:

// lambda/ai-middleware/data-store-middleware.ts

import type { LanguageModelMiddleware } from 'ai';
import { createStreamTransform } from '../utils/create-stream-transform';
import type { LlmLogData } from '../utils/log-data-store';

export const DataStoreMiddleware = (
  setData: (data: LlmLogData) => void,
): LanguageModelMiddleware => ({
  specificationVersion: 'v3',

  wrapGenerate: async ({ doGenerate, params }) => {
    const result = await doGenerate();
    setData({
      llmParam: params as Record<PropertyKey, unknown>,
      llmResult: result as Record<PropertyKey, unknown>,
    });
    return result;
  },

  wrapStream: async ({ doStream, params }) => {
    const result = await doStream();

    return {
      ...result,
      stream: createStreamTransform(result.stream, (streamParts) => {
        setData({
          llmParam: params as Record<PropertyKey, unknown>,
          llmResult: {
            request: result.request,
            response: result.response,
            streamParts,
          },
        });
      }),
    };
  },
});
Enter fullscreen mode Exit fullscreen mode

Both middlewares follow the same shape. In wrapGenerate, the data is available synchronously after doGenerate() resolves, so no TransformStream is needed.

In wrapStream, data only becomes available after the entire stream is consumed, so we use createStreamTransform to defer our work to flush().

The middlewares are composed onto the model via wrapLanguageModel():

const aiMiddlewares = [
  LoggingMiddleware(logger),
  DataStoreMiddleware(llmLogDataStore.set),
];

const model = wrapLanguageModel({
  model: baseModel,
  middleware: aiMiddlewares,
});
Enter fullscreen mode Exit fullscreen mode

How do you publish observability data after a stream completes?

This is the Middy middleware that actually solves the timing problem from Part 2. It is a dual-path middleware: it handles both streaming and non-streaming responses.

// lambda/middleware/publish-log.ts

import type middy from '@middy/core';
import type { LlmLogDataStore } from '../utils/log-data-store';

type StreamResponse = {
  body: ReadableStream<Uint8Array>;
  headers: Record<string, string>;
  statusCode: number;
};

type Logger = {
  info: (...args: Array<unknown>) => void;
  error: (...args: Array<unknown>) => void;
};

const isStreamResponse = (response: unknown): response is StreamResponse =>
  typeof response === 'object' &&
  response !== null &&
  'body' in response &&
  (response as StreamResponse).body instanceof ReadableStream;

export const publishLog = (
  llmLogDataStore: LlmLogDataStore,
  logger: Logger,
): middy.MiddlewareObj => {
  const publishImmediately = async (): Promise<void> => {
    const data = llmLogDataStore.get();
    if (!data) return;

    try {
      logger.info('LLM log data (non-streaming)', { data });
    } catch (error) {
      logger.error('Failed to publish LLM log data', { error });
    } finally {
      llmLogDataStore.clear();
    }
  };

  const publishAfterStreamConsumption = (request: middy.Request): void => {
    const response = request.response as StreamResponse;

    const publishTransform = new TransformStream<Uint8Array, Uint8Array>({
      transform(chunk: Uint8Array, controller): void {
        controller.enqueue(chunk);
      },

      async flush(): Promise<void> {
        try {
          const data = llmLogDataStore.get();
          if (!data) return;

          logger.info('LLM log data (streaming)', { data });
        } catch (error) {
          logger.error('Failed to publish LLM log data', { error });
        } finally {
          llmLogDataStore.clear();
        }
      },
    });

    (request.response as StreamResponse).body =
      response.body.pipeThrough(publishTransform);
  };

  return {
    before: (): void => {
      llmLogDataStore.clear();
    },

    after: async (request: middy.Request): Promise<void> => {
      if (isStreamResponse(request.response)) {
        publishAfterStreamConsumption(request);
        return;
      }

      await publishImmediately();
    },

    onError: async (): Promise<void> => {
      await publishImmediately();
    },
  };
};
Enter fullscreen mode Exit fullscreen mode

Read the after hook carefully. For streaming responses, it does not publish. It wraps the response body with a new TransformStream.

The actual publication happens inside that TransformStream's flush(), which fires later, after the Lambda runtime has consumed every byte.

For non-streaming responses (e.g., a handler that uses generateObject instead of streamText), the data is already available, so we publish immediately. The isStreamResponse type guard decides which path to take.

The before hook clears the store at the start of each invocation.

The onError hook publishes whatever data exists if the handler throws before streaming starts. Partial data can still be valuable for debugging failures.

The Complete Pipeline

When the Lambda runtime consumes the response body, data flows through a chain of TransformStreams:

streamText()
  -> AI SDK stream (LanguageModelV3StreamPart)
    -> LoggingMiddleware.wrapStream    (flush: logs completion)
    -> DataStoreMiddleware.wrapStream  (flush: stores data in Map)
  -> toTextStreamResponse()
    -> ReadableStream<Uint8Array>
      -> publishLog TransformStream    (flush: reads Map, publishes)
        -> Middy pipes to Lambda responseStream
          -> API Gateway streams to client
Enter fullscreen mode Exit fullscreen mode

Here is the same pipeline as a visual diagram:

flowchart LR
    A["streamText()"] --> B["LoggingMiddleware\n(TransformStream)"]
    B --> C["DataStoreMiddleware\n(TransformStream)"]
    C --> D["toTextStreamResponse()"]
    D --> E["publishLog\n(TransformStream)"]
    E --> F["Lambda\nresponseStream"]
    F --> G["API Gateway\n→ Client"]

    B -. "flush: log completion" .-> B
    C -. "flush: store data" .-> C
    E -. "flush: publish log" .-> E

    style B fill:#e1f5fe
    style C fill:#e1f5fe
    style E fill:#fff3e0
Enter fullscreen mode Exit fullscreen mode

The three colored boxes are TransformStream instances. The blue ones are AI SDK middlewares operating on LanguageModelV3StreamPart chunks. The orange one is the Middy middleware operating on Uint8Array bytes after toTextStreamResponse() serialization.

The flush() callbacks execute in pipeline order. This is guaranteed by how pipeThrough chains work: the upstream TransformStream must close its readable side before the downstream TransformStream's writable side closes, which means upstream flush() completes before downstream flush() is invoked.

The sequence:

  1. LoggingMiddleware.flush(): logs the LLM streaming result to CloudWatch.
  2. DataStoreMiddleware.flush(): writes llmParam and llmResult into the in-memory store.
  3. publishLog.flush(): reads from the store and publishes to the observability backend (SNS, Kinesis, CloudWatch, or whatever you need).

Step 2 sets the data. Step 3 reads it. This ordering is not a coincidence we hope holds. It is a structural guarantee of pipeThrough chains in the WHATWG spec.

Upstream flushes complete before downstream flushes begin.

Here is how the handler wires everything together:

// lambda/index.ts

const aiMiddlewares = [
  LoggingMiddleware(logger),
  DataStoreMiddleware(llmLogDataStore.set),
];

const streamHandler = async (
  event: APIGatewayProxyEvent,
): Promise<HttpStreamResponse> => {
  const body = JSON.parse(event.body ?? '{}');
  const prompt: string = body.prompt ?? 'Summarize the benefits of serverless architecture';

  const result = streamingService({ prompt }, onError, aiMiddlewares);
  const response = result.toTextStreamResponse();

  return {
    body: response.body ?? '',
    headers: Object.fromEntries(response.headers.entries()),
    statusCode: response.status,
  };
};

export const handler = middy<APIGatewayProxyEvent, HttpStreamResponse>({
  streamifyResponse: true,
})
  .use(httpCors({ origins: ['*'] }))
  .use(publishLog(llmLogDataStore, logger))
  .handler(streamHandler);
Enter fullscreen mode Exit fullscreen mode

Nothing in this handler blocks on observability. The stream starts flowing to the client immediately. The TransformStreams execute their flush() callbacks transparently as part of the pipe chain, after every byte has been delivered.

Why is TransformStream flush() better than AI SDK onFinish?

AI SDK provides an onFinish callback in streamText(). You might wonder why we did not just use that.

The problem is that onFinish is fire-and-forget. The AI SDK calls your callback but does not await the returned promise. From the AI SDK source:

// Simplified from AI SDK internals
onFinish?.({ usage, object, ... }); // No await
Enter fullscreen mode Exit fullscreen mode

This means:

  • If your onFinish handler does async work (publishing to SNS, writing to a database), the Lambda runtime may freeze the execution context before that work completes.
  • There is no backpressure. The stream closes and the response ends regardless of whether your callback finished.
  • There is no error propagation. If onFinish throws, nobody catches it.

Our flush() approach is fundamentally different:

  • flush() is part of the pipeline. It executes as a step in the pipe chain. The downstream consumer does not see the stream as "closed" until flush() resolves. The Lambda runtime will not freeze the context while flush() is awaiting an SNS publish.
  • flush() only fires on success. If the stream aborts (client disconnect, timeout), flush() never runs. We do not publish partial, potentially misleading data.
  • flush() ordering is deterministic. In a chain of pipeThrough calls, upstream flushes complete before downstream flushes begin. We rely on this to ensure the data store is populated before we read from it.

The difference is not subtle. onFinish is a notification. flush() is a pipeline stage. One is best-effort. The other is structurally guaranteed.

Recap

The pattern we built in this post can be summarized in one sentence: attach observability to the data pipeline using TransformStream flush(), instead of relying on middleware lifecycle hooks that fire at the wrong time.

Three properties make this work:

  1. WHATWG flush() guarantee: fires once, after all transform() promises resolve and the writable side closes successfully. This is not a Node.js quirk. It is a web standard implemented consistently across runtimes.

  2. Pipeline ordering: in a pipeThrough chain, upstream flush() completes before downstream flush() begins. This lets us write data in one TransformStream and read it in another, with guaranteed ordering.

  3. Error isolation: try/catch inside flush() ensures observability failures never break the user-facing stream. The client gets their complete response regardless of whether our publish succeeded.

The createStreamTransform utility is 24 lines of code. The publishLog middleware is under 90. Together, they solve a problem that has no clean solution in traditional middleware architectures.

What's Next

We have the pattern. We have the code. But how do we know it actually works? How do we verify that flush() fires in the right order, that errors are properly swallowed, that aborted streams skip publication?

In Part 4, we put it all together: a complete CDK stack you can deploy in 5 minutes, with tests that verify the TransformStream timing guarantees.

We will write unit tests that prove the flush ordering, integration tests that confirm end-to-end observability, and cover the edge cases: client disconnects, LLM timeouts, and publish failures.

References

Top comments (0)