DEV Community

Building long-running usage reports on AWS

In this post, I will discuss how to generate a report that could take a long time, and I will use 2 different implementations to see the differences.

  • Implementation A: AWS Step Functions (Standard workflows)
  • Implementation B: AWS Lambda Durable Functions

The real challenge is not the HTTP request per se, but rather when the request is accepted.

When a workflow involves long running operations, I must be able to:

  • stop compute while waiting
  • resume execution later
  • preserve state safely
  • handle retries and failures without re-running everything

Lambda Durable Functions exist because teams needed a way to pause a Lambda execution while external work (for example, an LLM generating a response) is still running without paying for an idle VM and without losing execution state.

However, this is not a new problem.

AWS Step Functions have supported this model for years using:

  • explicit state machines
  • task tokens and callbacks

In this article, I explore the same requirement implemented with both approaches to make the trade offs concrete rather than theoretical.

Note:
The code samples are meant to illustrate orchestration patterns and tradeoffs. Some helper functions are shortened to keep the focus on the main architecture.

Table of contents

The flow

The process is the same whether you use Step Functions or Durable Lambda. The difference is where the state machine lives.

  1. POST /reports with { tenantId, period, dimensions, metrics, prompt, styles[] }

  2. CreateOrGetReport Lambda

    • validates & normalises metricsSpec
    • computes metricsKey
    • writes job state to DynamoDB (idempotent)
    • starts orchestration (if NEW)
  3. Orchestrator starts Athena query

    • registers a wait (event-driven)
    • waits for Athena completion via EventBridge
  4. Orchestrator stores metrics.json in S3

    • this happens ONCE per metricsKey
  5. For each requested narrative style:

    • orchestrator sends ONE SQS message (metricsKey + prompt + style + resume token)
    • orchestrator waits for completion (no compute running)
  6. LLM worker

    • consumes SQS message
    • calls OpenAI
    • stores narrative-${style}.json in S3
    • signals orchestration to resume
  7. Orchestrator

    • marks job DONE in DynamoDB
    • returns pointers to artifacts

1) Request contract

The client sends a prompt that describes what to aggregate, but the backend handles the actual execution.

Example request:

{
  "tenantId": "acme",
  "period": "2026-01",
  "prompt": "Generate my usage report for last month and explain anomalies",
  "dimensions": ["endpoint"],
  "metrics": ["totalRequests", "topEndpoints", "anomalies"],
  "styles": [
    { "id": "executive", "prompt": "Write an executive summary." },
    { "id": "technical", "prompt": "Write a technical explanation." },
    { "id": "anomalies", "prompt": "Explain anomalies and likely causes." }
  ]
}
Enter fullscreen mode Exit fullscreen mode

2) CreateOrGetReport Lambda

This Lambda is behind API Gateway and handles three things:

1) Normalise the user’s request into a deterministic metricsSpec

2) Compute a stable dedupe key (metricsKey)

3) Upsert a job item in DynamoDB with a conditional write (idempotent)

The DynamoDB table could look like:

  • PK: TENANT#{tenantId}
  • SK: REPORT#{period}#{metricsKey} (or just REPORT#{metricsKey} if period is already inside the key)
  • Attributes: status, reportS3Prefix, createdAt, updatedAt, etc.

CreateOrGetReport (handler)

import { APIGatewayProxyHandlerV2 } from "aws-lambda";
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, PutCommand, GetCommand } from "@aws-sdk/lib-dynamodb";
import { SFNClient, StartExecutionCommand } from "@aws-sdk/client-sfn";
import crypto from "crypto";

const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const sfn = new SFNClient({});

const JOBS_TABLE = process.env.JOBS_TABLE!;
const ORCHESTRATOR_ARN = process.env.ORCHESTRATOR_ARN!;

const ALLOWED_DIMENSIONS = ["endpoint", "region", "statusCode"] as const;
const ALLOWED_METRICS = ["totalRequests", "topEndpoints", "anomalies"] as const;

type AllowedDimension = (typeof ALLOWED_DIMENSIONS)[number];
type AllowedMetric = (typeof ALLOWED_METRICS)[number];

type Style = { id: string; prompt: string };

type ReportRequest = {
  tenantId: string;
  period: string; // YYYY-MM
  prompt: string;
  dimensions: AllowedDimension[];
  metrics: AllowedMetric[];
  styles: Style[];
};

type MetricsSpec = {
  tenantId: string;
  period: string;
  dimensions: AllowedDimension[];
  metrics: AllowedMetric[];
};

function canonicalJson(obj: unknown): string {
  // Deterministic JSON (stable keys). For production, use a tested library.
  return JSON.stringify(obj, Object.keys(obj as any).sort());
}

function sha256(s: string): string {
  return crypto.createHash("sha256").update(s).digest("hex");
}

function assertAllowed<T extends readonly string[]>(allowed: T, values: string[], field: string): asserts values is T[number][] {
  for (const v of values) {
    if (!(allowed as readonly string[]).includes(v)) {
      throw new Error(`Invalid ${field}: ${v}`);
    }
  }
}

function normalizeMetricsSpec(input: ReportRequest): MetricsSpec {
  if (!input.tenantId || !input.period) throw new Error("tenantId/period required");
  if (!Array.isArray(input.dimensions) || !Array.isArray(input.metrics)) throw new Error("dimensions/metrics required");

  assertAllowed(ALLOWED_DIMENSIONS, input.dimensions as any, "dimension");
  assertAllowed(ALLOWED_METRICS, input.metrics as any, "metric");

  const dimensions = Array.from(new Set(input.dimensions)).sort() as AllowedDimension[];
  const metrics = Array.from(new Set(input.metrics)).sort() as AllowedMetric[];

  return {
    tenantId: input.tenantId,
    period: input.period,
    dimensions,
    metrics
  };
}

export const handler: APIGatewayProxyHandlerV2 = async (event) => {
  const body = event.body ? JSON.parse(event.body) : {};
  const req = body as ReportRequest;

  const metricsSpec = normalizeMetricsSpec(req);
  const metricsKey = sha256(canonicalJson(metricsSpec));

  const pk = `TENANT#${req.tenantId}`;
  const sk = `REPORT#${req.period}#${metricsKey}`;

  const now = new Date().toISOString();
  const reportS3Prefix = `reports/${req.tenantId}/${req.period}/${metricsKey}/`;

  // Attempt to create job (idempotent) using conditional write.
  try {
    await ddb.send(new PutCommand({
      TableName: JOBS_TABLE,
      Item: {
        pk, sk,
        tenantId: req.tenantId,
        period: req.period,
        metricsKey,
        metricsSpec,
        status: "IN_PROGRESS",
        reportS3Prefix,
        styles: req.styles,
        prompt: req.prompt,
        createdAt: now,
        updatedAt: now
      },
      ConditionExpression: "attribute_not_exists(pk) AND attribute_not_exists(sk)"
    }));

    // NEW job → start orchestration
    await sfn.send(new StartExecutionCommand({
      stateMachineArn: ORCHESTRATOR_ARN,
      input: JSON.stringify({
        pk, sk,
        tenantId: req.tenantId,
        period: req.period,
        metricsKey,
        metricsSpec,
        prompt: req.prompt,
        styles: req.styles,
        reportS3Prefix
      })
    }));

    return {
      statusCode: 202,
      body: JSON.stringify({ status: "IN_PROGRESS", metricsKey, reportS3Prefix })
    };
  } catch (err: any) {
    // ConditionalCheckFailedException → job already exists
    const existing = await ddb.send(new GetCommand({
      TableName: JOBS_TABLE,
      Key: { pk, sk }
    }));

    const item = existing.Item as any;
    const status = item?.status ?? "UNKNOWN";

    // If DONE, return pointers; if IN_PROGRESS, return same report key.
    return {
      statusCode: status === "DONE" ? 200 : 202,
      body: JSON.stringify({
        status,
        metricsKey,
        reportS3Prefix: item?.reportS3Prefix,
        artifacts: item?.artifacts
      })
    };
  }
};
Enter fullscreen mode Exit fullscreen mode

3) Athena aggregation

Assume the events are stored in Parquet format and partitioned like this:

s3://usage/tenant_id=acme/month=2026-01/...
Enter fullscreen mode Exit fullscreen mode

A query could look like:

WITH summary AS (
  SELECT COUNT(*) AS total_requests
  FROM usage_parquet
  WHERE tenant_id = :tenantId
    AND month = :period
),
endpoints AS (
  SELECT endpoint, COUNT(*) AS requests
  FROM usage_parquet
  WHERE tenant_id = :tenantId
    AND month = :period
  GROUP BY endpoint
  ORDER BY requests DESC
  LIMIT 10
)
SELECT
  summary.total_requests,
  ARRAY_AGG(endpoints.endpoint) AS top_endpoints
FROM summary, endpoints;
Enter fullscreen mode Exit fullscreen mode

Athena completion events

Athena can emit query state change events to EventBridge:

I will use it to resume orchestration.

4) Metrics materialisation

Athena produces a result once for each metricsKey and stores it as:

  • metrics.json (S3)
  • status + pointers (DynamoDB)

Example metrics JSON:

{
  "tenantId": "acme",
  "period": "2026-01",
  "totalRequests": 18234567,
  "topEndpoints": ["POST /ads", "GET /reports"],
  "anomalies": [{ "day": "2026-01-12", "delta": "+320%" }]
}
Enter fullscreen mode Exit fullscreen mode

S3 layout:

s3://my-report-bucket/
  reports/acme/2026-01/<metricsKey>/
    metrics.json
    narrative-executive.json
    narrative-technical.json
    narrative-anomalies.json
Enter fullscreen mode Exit fullscreen mode

5) Queueing narrative generation

OpenAI could limit the requests:

  • rate limits (429)
  • transient errors (5xx)
  • sensitivity to bursts (e.g., month-end report spikes)

And so we need to control this part just in case, using the callback pattern:

  • Step Functions → task token pattern (worker calls SendTaskSuccess/Failure)
  • Durable Lambda → callback pattern (worker calls SendDurableExecutionCallbackSuccess/Failure)

Implementation A: Step Functions

We’ll use:

  • Step Functions Standard workflow
  • Event driven resume for Athena via EventBridge + a resume Lambda
  • SQS .waitForTaskToken for narrative workers

Docs

A1) Step Functions state machine

{
  "Comment": "Usage report orchestration (Athena + queued narratives)",
  "StartAt": "StartAthena",
  "States": {
    "StartAthena": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:startQueryExecution",
      "Parameters": {
        "QueryString.$": "$.athena.queryString",
        "QueryExecutionContext": { "Database.$": "$.athena.database" },
        "ResultConfiguration": { "OutputLocation.$": "$.athena.outputLocation" }
      },
      "ResultPath": "$.athenaStart",
      "Retry": [
        {
          "ErrorEquals": ["Athena.TooManyRequestsException", "Athena.InternalServerException"],
          "IntervalSeconds": 2,
          "BackoffRate": 2,
          "MaxAttempts": 6
        }
      ],
      "Next": "RegisterAndWaitForAthena"
    },

    "RegisterAndWaitForAthena": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "${RegisterAthenaWaitLambdaArn}",
        "Payload": {
          "taskToken.$": "$$.Task.Token",
          "queryExecutionId.$": "$.athenaStart.QueryExecutionId",
          "pk.$": "$.pk",
          "sk.$": "$.sk"
        }
      },
      "TimeoutSeconds": 86400,
      "ResultPath": "$.athenaResult",
      "Next": "PutMetricsToS3"
    },

    "PutMetricsToS3": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "${WriteMetricsLambdaArn}",
        "Payload": {
          "reportS3Prefix.$": "$.reportS3Prefix",
          "metrics.$": "$.athenaResult.metrics"
        }
      },
      "ResultPath": "$.metricsWrite",
      "Next": "NarrativesMap"
    },

    "NarrativesMap": {
      "Type": "Map",
      "ItemsPath": "$.styles",
      "MaxConcurrency": 10,
      "Parameters": {
        "tenantId.$": "$.tenantId",
        "period.$": "$.period",
        "metricsKey.$": "$.metricsKey",
        "reportS3Prefix.$": "$.reportS3Prefix",
        "prompt.$": "$.prompt",
        "style.$": "$$.Map.Item.Value"
      },
      "Iterator": {
        "StartAt": "QueueNarrativeAndWait",
        "States": {
          "QueueNarrativeAndWait": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
            "Parameters": {
              "QueueUrl": "${NarrativeQueueUrl}",
              "MessageBody": {
                "taskToken.$": "$$.Task.Token",
                "tenantId.$": "$.tenantId",
                "period.$": "$.period",
                "metricsKey.$": "$.metricsKey",
                "reportS3Prefix.$": "$.reportS3Prefix",
                "prompt.$": "$.prompt",
                "styleId.$": "$.style.id",
                "stylePrompt.$": "$.style.prompt"
              }
            },
            "TimeoutSeconds": 3600,
            "Retry": [
              {
                "ErrorEquals": ["States.Timeout", "States.TaskFailed"],
                "IntervalSeconds": 2,
                "BackoffRate": 2,
                "MaxAttempts": 3
              }
            ],
            "End": true
          }
        }
      },
      "ResultPath": "$.narratives",
      "Next": "MarkDone"
    },

    "MarkDone": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "${MarkDoneLambdaArn}",
        "Payload": {
          "pk.$": "$.pk",
          "sk.$": "$.sk",
          "reportS3Prefix.$": "$.reportS3Prefix"
        }
      },
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

The above workflow essentially works in this way:

  • Athena has started.
  • The workflow pauses on a task token while Athena runs.
  • When Athena completes, an EventBridge-triggered Lambda calls SendTaskSuccess with the token.
  • For each style, the workflow sends an SQS message containing a task token and waits.
  • The worker does the OpenAI call, writes the narrative, and resumes the workflow using the task token.

A2) Athena resume handler

We need two Lambdas:

1) RegisterAthenaWait Lambda (called by Step Functions with .waitForTaskToken)

Stores taskToken keyed by queryExecutionId.

2) AthenaEventBridgeHandler Lambda (triggered by EventBridge)

Looks up the token by queryExecutionId and calls SendTaskSuccess/Failure.

The DynamoDB table to track the wait tokens looks like:

  • PK: ATHENA#<queryExecutionId>
  • Attributes: taskToken, pk, sk, createdAt

1) RegisterAthenaWait Lambda

import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, PutCommand } from "@aws-sdk/lib-dynamodb";

const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const TOKENS_TABLE = process.env.TOKENS_TABLE!;

export const handler = async (event: any) => {
  const { taskToken, queryExecutionId, pk, sk } = event;

  await ddb.send(new PutCommand({
    TableName: TOKENS_TABLE,
    Item: {
      pk: `ATHENA#${queryExecutionId}`,
      sk: "WAIT",
      taskToken,
      jobPk: pk,
      jobSk: sk,
      createdAt: new Date().toISOString()
    }
  }));

  return {};
};
Enter fullscreen mode Exit fullscreen mode

2) EventBridge handler (Athena completion → SendTaskSuccess)

import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, GetCommand, DeleteCommand } from "@aws-sdk/lib-dynamodb";
import { SFNClient, SendTaskSuccessCommand, SendTaskFailureCommand } from "@aws-sdk/client-sfn";
import { AthenaClient, GetQueryResultsCommand } from "@aws-sdk/client-athena";

const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const sfn = new SFNClient({});
const athena = new AthenaClient({});

const TOKENS_TABLE = process.env.TOKENS_TABLE!;

export const handler = async (event: any) => {
  // Athena EventBridge event shape varies by detail-type; keep parsing defensive.
  const queryExecutionId = event?.detail?.queryExecutionId ?? event?.detail?.QueryExecutionId;
  const state = event?.detail?.currentState ?? event?.detail?.state ?? event?.detail?.CurrentState;

  if (!queryExecutionId) return;

  const tokenItem = await ddb.send(new GetCommand({
    TableName: TOKENS_TABLE,
    Key: { pk: `ATHENA#${queryExecutionId}`, sk: "WAIT" }
  }));
  if (!tokenItem.Item) return;

  const taskToken = (tokenItem.Item as any).taskToken as string;

  try {
    if (state === "SUCCEEDED") {
      // Fetch results or locate them in S3. For demo: fetch a small result set.
      const results = await athena.send(new GetQueryResultsCommand({
        QueryExecutionId: queryExecutionId,
        MaxResults: 1000
      }));

      const metrics = parseMetrics(results); // implement mapping to your metrics.json

      await sfn.send(new SendTaskSuccessCommand({
        taskToken,
        output: JSON.stringify({ metrics, queryExecutionId })
      }));
    } else {
      await sfn.send(new SendTaskFailureCommand({
        taskToken,
        error: "AthenaQueryFailed",
        cause: JSON.stringify({ queryExecutionId, state })
      }));
    }
  } finally {
    // Best-effort cleanup so we don't reuse tokens accidentally.
    await ddb.send(new DeleteCommand({
      TableName: TOKENS_TABLE,
      Key: { pk: `ATHENA#${queryExecutionId}`, sk: "WAIT" }
    }));
  }
};

function parseMetrics(_results: any) {
  ...
}
Enter fullscreen mode Exit fullscreen mode

A3) SQS worker (Step Functions)

The worker must do this:
1) idempotency guard (SQS duplicates happen)
2) call OpenAI (using your subscription)
3) store narrative-${style}.json to S3
4) resume Step Functions using SendTaskSuccess/Failure

The DynamoDB table for the idempotency look like:

  • PK: NARRATIVE#<metricsKey>#<styleId>
  • SK: v1 (or a version string if you change format)
  • Attributes: s3Key, createdAt

Worker code

import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, PutCommand, GetCommand } from "@aws-sdk/lib-dynamodb";
import { SFNClient, SendTaskSuccessCommand, SendTaskFailureCommand } from "@aws-sdk/client-sfn";
import { SSMClient, GetParameterCommand } from "@aws-sdk/client-ssm";

const s3 = new S3Client({});
const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const sfn = new SFNClient({});
const ssm = new SSMClient({});

const REPORT_BUCKET = process.env.REPORT_BUCKET!;
const NARR_TABLE = process.env.NARR_TABLE!;
const OPENAI_API_KEY_PARAM = process.env.OPENAI_API_KEY_PARAM!;

async function getOpenAiKey(): Promise<string> {
  const p = await ssm.send(new GetParameterCommand({
    Name: OPENAI_API_KEY_PARAM,
    WithDecryption: true
  }));
  const v = p.Parameter?.Value;
  if (!v) throw new Error("Missing OpenAI API key");
  return v;
}

export const handler = async (event: any) => {
  const key = await getOpenAiKey();

  for (const record of event.Records ?? []) {
    const msg = JSON.parse(record.body);

    const {
      taskToken,
      metricsKey,
      reportS3Prefix,
      prompt,
      styleId,
      stylePrompt
    } = msg;

    const narrPk = `NARRATIVE#${metricsKey}#${styleId}`;
    const narrSk = "v1";
    const narrativeKey = `${reportS3Prefix}narrative-${styleId}.json`;

    try {
      // 1) Idempotency guard (exactly-once effect)
      // Attempt to create the narrative record. If it exists, skip generation.
      try {
        await ddb.send(new PutCommand({
          TableName: NARR_TABLE,
          Item: { pk: narrPk, sk: narrSk, s3Key: narrativeKey, createdAt: new Date().toISOString() },
          ConditionExpression: "attribute_not_exists(pk) AND attribute_not_exists(sk)"
        }));
      } catch (e: any) {
        // Already exists: load and return success without re-running OpenAI
        const existing = await ddb.send(new GetCommand({
          TableName: NARR_TABLE,
          Key: { pk: narrPk, sk: narrSk }
        }));

        await sfn.send(new SendTaskSuccessCommand({
          taskToken,
          output: JSON.stringify({
            status: "ALREADY_DONE",
            narrativeKey: (existing.Item as any)?.s3Key
          })
        }));
        continue;
      }

      // 2) Load metrics.json (or pass metrics inline if small).
      // Here we assume it’s in S3: `${reportS3Prefix}metrics.json`
      // Read it and include it in the prompt.

      // 3) Call OpenAI (your subscription)
      const text = await callOpenAi({
        apiKey: key,
        userPrompt: prompt,
        stylePrompt
      });

      // 4) Store narrative artifact to S3
      await s3.send(new PutObjectCommand({
        Bucket: REPORT_BUCKET,
        Key: narrativeKey,
        ContentType: "application/json",
        Body: JSON.stringify({
          styleId,
          generatedAt: new Date().toISOString(),
          text
        })
      }));

      // 5) Resume Step Functions
      await sfn.send(new SendTaskSuccessCommand({
        taskToken,
        output: JSON.stringify({ status: "DONE", narrativeKey })
      }));
    } catch (err: any) {
      await sfn.send(new SendTaskFailureCommand({
        taskToken,
        error: "NarrativeFailed",
        cause: JSON.stringify({ message: err?.message ?? String(err), styleId, metricsKey })
      }));
    }
  }
};

async function callOpenAi(args: { apiKey: string; userPrompt: string; stylePrompt: string }): Promise<string> {
  const res = await fetch("https://api.openai.com/v1/responses", {
    method: "POST",
    headers: {
      "Authorization": `Bearer ${args.apiKey}`,
      "Content-Type": "application/json"
    },
    body: JSON.stringify({
      model: "gpt-5-mini",
      input: [
        { role: "system", content: "You write concise, accurate usage report narratives for SaaS metrics." },
        { role: "user", content: `${args.stylePrompt}\n\nUser request:\n${args.userPrompt}\n\nUse the metrics.json content as the source of truth.` }
      ]
    })
  });

  if (!res.ok) {
    const t = await res.text();
    throw new Error(`OpenAI error ${res.status}: ${t}`);
  }
  const json: any = await res.json();
  return json.output_text ?? JSON.stringify(json);
}
Enter fullscreen mode Exit fullscreen mode

Notes

  • The idempotency guard is before calling OpenAI. That’s the whole point: prevent double spend when SQS redelivers.

Implementation B: Lambda Durable Functions

Docs

B1) Durable orchestrator

Key idea: for event-driven integration, we use callbacks:

  • create a callback id
  • send it to an external system (SQS worker/EventBridge handler)
  • resume execution when that external system completes the callback

Orchestrator

import { DurableContext } from "@aws/durable-execution-sdk-js";
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
import { AthenaClient, StartQueryExecutionCommand } from "@aws-sdk/client-athena";
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";

const sqs = new SQSClient({});
const athena = new AthenaClient({});
const s3 = new S3Client({});

const NARRATIVE_QUEUE_URL = process.env.NARRATIVE_QUEUE_URL!;
const REPORT_BUCKET = process.env.REPORT_BUCKET!;
const TOKENS_TABLE = process.env.TOKENS_TABLE!; // callbackId mappings, implemented in submitters

type Style = { id: string; prompt: string };

type Input = {
  tenantId: string;
  period: string;
  metricsKey: string;
  metricsSpec: any;
  prompt: string;
  styles: Style[];
  reportS3Prefix: string;
};

export const handler = async (event: any, context: any) => {
  const dctx: DurableContext = (context as any).durable;

  const input = event as Input;

  // Step 1: start Athena query (checkpoint + retry)
  const queryExecutionId = await dctx.step(
    "start-athena",
    async () => {
      const out = await athena.send(new StartQueryExecutionCommand({
        QueryString: buildSql(input.metricsSpec),
        QueryExecutionContext: { Database: process.env.ATHENA_DB },
        ResultConfiguration: { OutputLocation: process.env.ATHENA_OUTPUT! }
      }));
      if (!out.QueryExecutionId) throw new Error("Missing QueryExecutionId");
      return out.QueryExecutionId;
    },
    {
      retry: {
        maxAttempts: 6,
        backoffRate: 2,
        initialDelayMs: 2000
      }
    }
  );

  // Step 2: wait for Athena completion via callback
  const athenaResult = await dctx.waitForCallback(
    "athena-done",
    async (callbackId) => {
      // Store callbackId keyed by queryExecutionId so EventBridge handler can complete it.
      // (Implementation shown in B2)
      await registerCallback("ATHENA", queryExecutionId, callbackId);
    },
    { timeout: { hours: 24 } }
  );

  // Step 3: store metrics.json once
  await dctx.step("write-metrics", async () => {
    await s3.send(new PutObjectCommand({
      Bucket: REPORT_BUCKET,
      Key: `${input.reportS3Prefix}metrics.json`,
      ContentType: "application/json",
      Body: JSON.stringify(athenaResult)
    }));
  });

  // Step 4: for each style → send SQS msg + wait for callback
  for (const style of input.styles) {
    await dctx.waitForCallback(
      `llm-${style.id}`,
      async (callbackId) => {
        // Enqueue work (one message per style)
        await sqs.send(new SendMessageCommand({
          QueueUrl: NARRATIVE_QUEUE_URL,
          MessageBody: JSON.stringify({
            callbackId,               // Durable resume token
            metricsKey: input.metricsKey,
            reportS3Prefix: input.reportS3Prefix,
            prompt: input.prompt,
            styleId: style.id,
            stylePrompt: style.prompt
          }),
          MessageAttributes: {
            styleId: { DataType: "String", StringValue: style.id },
            metricsKey: { DataType: "String", StringValue: input.metricsKey }
          }
        }));

        // Store mapping so the worker can be idempotent and handlers can correlate if needed.
        await registerCallback("LLM", `${input.metricsKey}#${style.id}`, callbackId);
      },
      { timeout: { hours: 1 } }
    );
  }

  // Step 5: done
  return {
    status: "DONE",
    reportS3Prefix: input.reportS3Prefix
  };
};

function buildSql(_spec: any): string {
  // Deterministic SQL builder from metricsSpec.
  return "SELECT ....";
}

async function registerCallback(kind: "ATHENA" | "LLM", key: string, callbackId: string) {
  // Intentionally abbreviated for article brevity: use the same DynamoDB PutItem pattern as the Step Functions token table.
  ...
}
Enter fullscreen mode Exit fullscreen mode

B2) Athena resume handler (Durable)

The Athena resume handler is intentionally almost identical to the Step Functions version; the only difference is the API used to resume execution.

Instead of calling SendTaskSuccess / SendTaskFailure, the handler calls:

  • SendDurableExecutionCallbackSuccess
  • SendDurableExecutionCallbackFailure

The event source, correlation logic, and failure semantics remain unchanged.

B3) SQS worker (Durable)

The SQS worker logic is identical to the Step Functions version.

The only difference is how the worker resumes the paused orchestration:

  • Step Functions: SendTaskSuccess / SendTaskFailure
  • Durable Functions: SendDurableExecutionCallbackSuccess / SendDurableExecutionCallbackFailure

All other concerns—idempotency, retries, OpenAI rate limits, and S3 materialisation—remain unchanged.

Cost comparison

Below is a cost comparison that isolates the differences between the two orchestration options. Everything else (Athena scan costs, S3 storage, DynamoDB read/write, OpenAI tokens, etc.) is essentially the same for both designs.

Let:

  • S = number of narrative styles requested in one report request (e.g., S=3: executive/technical/anomalies)

For Step Functions Standard: we have roughly:

  • StartAthena → 1 transition
  • RegisterAndWaitForAthena → 1 transition
  • PutMetricsToS3 → 1 transition
  • NarrativesMap overhead → 1 transition
  • Inside Map: per style, QueueNarrativeAndWait → ~1 transition each (plus Map bookkeeping)
  • MarkDone → 1 transition

We could say:

Transitions ≈ 5 + S
Enter fullscreen mode Exit fullscreen mode

For S = 3:

  • transitions ≈ 8
  • cost per report ≈ 8 * 0.000025 = $0.000200

Volumes:

  • 1,000 reports → $0.20
  • 10,000 reports → $2.00
  • 100,000 reports → $20.00

For the Lambda Durable Functions: we have:

  • Start Athena: 1 step
  • Wait for Athena callback: 1 callback wait
  • Write metrics: 1 step
  • For each style: 1 callback wait (the submitter + wait)
  • (Plus orchestration lifecycle operations)

We could say:

Durable operations ≈ 4 + S
Enter fullscreen mode Exit fullscreen mode

For S = 3:

  • durable ops ≈ 10
  • ops cost per report ≈ 10 * (8 / 1,000,000) = $0.000080

Volumes:

  • 1,000 reports → $0.08
  • 10,000 reports → $0.80
  • 100,000 reports → $8.00

But durable also adds:

  • Data written (payload checkpoints)
  • Data retained (GB-month) if you keep a durable history

These depend heavily on:

  • checkpoint payload sizes
  • how long callbacks wait (hours vs days)
  • retention window

What is actually different?

  • Step Functions

    • explicit workflow definition (ASL)
    • built-in execution history & visual debugging
    • managed service integrations, clear retry policies at the workflow level
    • task token callback pattern for SQS
  • Durable Functions

    • code-first orchestration
    • durable checkpoints + deterministic replay
    • callback IDs completed via Lambda APIs (SendDurableExecutionCallbackSuccess/Failure)

Conclusion

This is not about Step Functions and Durable Functions competing against each other. They are simply two ways to build the same thing, and the hard problems do not disappear by changing the orchestration tool.

Lambda Durable Functions are not revolutionary (my view). What actually differs between the two approaches is where complexity lives.

  • Step Functions externalise orchestration into a managed service.
  • Durable Functions move the workflow into the codebase.

From a cost perspective, the orchestration cost tends to be in a similar range for both approaches.

If a team has little or no experience with ASL and prefers to keep orchestration logic close to application code, Lambda Durable Functions can be a very good fit.

Top comments (0)