Skip to content

Feature: SQS Queue Consumer L3 Construct with Typed Message Handlers #356

@hoegertn

Description

@hoegertn

Summary

Add an opinionated L3 construct for SQS → Lambda processing that follows the cdk-serverless code generation approach. Users define message schemas in a definition file and get typed handlers, DLQ configuration, partial batch failure handling, and sensible defaults — all generated automatically.

Problem

SQS-triggered Lambda is one of the most common serverless patterns for decoupling and buffering workloads, but setting it up properly involves a surprising amount of detail:

  • Calculating visibility timeout relative to Lambda timeout (the AWS docs recommend 6x)
  • Configuring a dead-letter queue with appropriate maxReceiveCount
  • Enabling ReportBatchItemFailures for partial batch failure handling
  • Setting batch size and batching window for throughput tuning
  • FIFO queue configuration (deduplication, message group ID)
  • IAM permissions between SQS, Lambda, and DLQ
  • No typed message payloads — handlers work with raw SQSEvent records

cdk-serverless users shouldn't have to think about any of this for the common case.

Proposed Solution

Definition File

# queues/order-processing.yaml
processorName: OrderProcessing
queues:
  ProcessOrders:
    schema:
      type: object
      properties:
        orderId:
          type: string
        customerId:
          type: string
        items:
          type: array
          items:
            type: object
            properties:
              productId:
                type: string
              quantity:
                type: integer
      required: [orderId, customerId, items]
    config:
      batchSize: 10
      maxBatchingWindow: 30  # seconds
      maxConcurrency: 5      # reserved concurrent executions
      fifo: false

  SendNotifications:
    schema:
      type: object
      properties:
        userId:
          type: string
        channel:
          type: string
          enum: [email, sms, push]
        templateId:
          type: string
        data:
          type: object
      required: [userId, channel, templateId]
    config:
      batchSize: 1           # process one notification at a time
      fifo: true
      deduplication: contentBased

Projen Integration

import { QueueProcessor } from 'cdk-serverless/projen';

new QueueProcessor(project, {
  processorName: 'OrderProcessing',
  definitionFile: 'queues/order-processing.yaml',
});

Running projen generates:

  • Typed message interfaces (e.g. ProcessOrdersMessage, SendNotificationsMessage)
  • Typed handler signatures for each queue (batch and per-record)
  • A typed QueueSender class for enqueuing messages
  • The L3 CDK construct

CDK Construct Usage

import { OrderProcessingQueueProcessor } from './generated/queue.orderprocessing.generated';

const processor = new OrderProcessingQueueProcessor(this, 'Processor', {
  singleTableDatastore, // optional
  additionalEnv: {
    NOTIFICATION_SERVICE_URL: props.notificationUrl,
  },
});

The construct automatically:

  • Creates the SQS queue (standard or FIFO based on config)
  • Creates a DLQ with maxReceiveCount: 3 (configurable)
  • Sets visibility timeout to 6 × Lambda timeout
  • Creates the Lambda function with the event source mapping
  • Enables ReportBatchItemFailures on the event source mapping
  • Configures maxConcurrency on the event source mapping if specified
  • Sets up CloudWatch alarms on DLQ depth (non-empty) and age of oldest message
  • Integrates with the existing monitoring infrastructure

Handler DX — Per-Record Processing

// Process one message at a time — framework handles batch iteration and partial failures
export const handler: ProcessOrdersRecordHandler = async (message) => {
  const { orderId, customerId, items } = message; // typed from schema
  
  // Process order...
  // If this throws, only this record is reported as failed (partial batch failure)
};

Handler DX — Full Batch Processing

// For advanced use cases where you need control over the full batch
export const handler: ProcessOrdersBatchHandler = async (messages, ctx) => {
  const results = await Promise.allSettled(
    messages.map(msg => processOrder(msg))
  );
  
  // Return failed message IDs — framework builds the batchItemFailures response
  return ctx.reportFailures(results);
};

Sending Messages

import { OrderProcessingSender } from './generated/queue.orderprocessing-sender.generated';

const sender = new OrderProcessingSender();

// Type-safe — schema enforces payload, auto-completion works
await sender.send('ProcessOrders', {
  orderId: '123',
  customerId: 'cust-456',
  items: [{ productId: 'prod-789', quantity: 2 }],
});

// FIFO queue — messageGroupId required
await sender.send('SendNotifications', {
  userId: 'user-123',
  channel: 'email',
  templateId: 'order-confirmation',
  data: { orderNumber: '123' },
}, { messageGroupId: 'user-123' });

Opinionated Defaults

Setting Default Rationale
Visibility Timeout 6 × Lambda timeout AWS recommendation
DLQ maxReceiveCount 3 Retry 3 times before dead-lettering
Batch Size 10 Lambda SQS default
Max Batching Window 0 (disabled) Low-latency by default
ReportBatchItemFailures enabled Always use partial batch failures
Reserved Concurrency none Opt-in to avoid over-throttling
Encryption SQS-managed SSE Encryption at rest by default

All defaults are overridable via the definition file or construct props.

Integration Points

  • EventBus construct: EventBridge rule → SQS queue for buffered event processing
  • SingleTableDatastore: Handlers get a pre-configured datastore client via context
  • SNS construct (if implemented): SNS → SQS subscription for fan-out with buffering
  • S3EventProcessor (if implemented): Uses SQS buffer mode internally
  • RestApi / GraphQlApi: API handlers use the generated sender to enqueue work

Out of Scope

  • SQS as an event source for Step Functions → Step Functions already exists in the toolkit
  • SQS → Lambda → SQS chaining patterns → users compose individual constructs
  • Amazon MQ / MSK consumer patterns → different services, different scope
  • FIFO exactly-once processing guarantees beyond what SQS natively provides

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions