Skip to content

Feature: S3 Event Processing L3 Construct with Typed Handlers #358

@hoegertn

Description

@hoegertn

Summary

Add an opinionated L3 construct for S3 event-triggered Lambda processing that follows the cdk-serverless code generation approach. Users define bucket event configurations and processing schemas in a definition file and get typed handlers with pre-configured S3 clients, configurable processing modes (direct, buffered, EventBridge), DLQ handling, and concurrency control — all generated automatically.

Problem

S3 → Lambda is one of the most common serverless patterns (file uploads, ETL pipelines, media processing, document ingestion), but setting it up properly requires dealing with:

  • S3 event notification configuration (event types, prefix/suffix filters)
  • Choosing between S3 native notifications, SQS buffering, or EventBridge for S3
  • IAM permissions for Lambda to read objects, manage DLQ, etc.
  • Concurrency management to avoid throttling downstream services
  • DLQ/retry for failed processing (S3 native notifications have no built-in retry)
  • The raw S3Event type is cumbersome — extracting bucket, key, size requires navigating nested records
  • No pre-configured S3 client for reading the triggering object

Each of these decisions is well-understood and has a clear best practice, but developers still make mistakes (e.g., forgetting to URL-decode the object key, not buffering through SQS for high-throughput buckets, missing DLQ on the event source).

Proposed Solution

Definition File

# s3-processors/documents.yaml
processorName: DocumentIngestion
processors:
  IngestUpload:
    events: [s3:ObjectCreated:Put, s3:ObjectCreated:CompleteMultipartUpload]
    prefix: uploads/
    suffix: .pdf
    mode: buffered  # S3 → SQS → Lambda (recommended for production)
    config:
      batchSize: 5
      maxConcurrency: 10
      lambdaTimeout: 300  # seconds — PDF processing can be slow

  ProcessThumbnail:
    events: [s3:ObjectCreated:Put]
    prefix: images/
    suffix: .jpg
    mode: direct  # S3 → Lambda (fine for low-throughput)
    config:
      lambdaTimeout: 30
      memorySize: 1024  # image processing needs more memory

  AuditDeletion:
    events: [s3:ObjectRemoved:Delete]
    prefix: ""  # all objects
    mode: evented  # S3 → EventBridge → Rule → Lambda
    config:
      lambdaTimeout: 10

Processing Modes

Mode Flow Best For
direct S3 → Lambda Low-throughput, simple processing, dev/staging
buffered S3 → SQS → Lambda Production workloads, high throughput, needs DLQ and retry
evented S3 → EventBridge → Lambda When events need to be routed to multiple targets or integrated with existing EventBridge bus

Projen Integration

import { S3EventProcessor } from 'cdk-serverless/projen';

new S3EventProcessor(project, {
  processorName: 'DocumentIngestion',
  definitionFile: 's3-processors/documents.yaml',
});

Running projen generates:

  • Typed handler signatures per processor (e.g. IngestUploadHandler, ProcessThumbnailHandler)
  • A typed S3Record interface with convenient accessors (decoded key, content type, size, etag)
  • A pre-configured handler context with S3 client helpers
  • The L3 CDK construct

CDK Construct Usage

import { DocumentIngestionS3Processor } from './generated/s3.documentingestion.generated';

const processor = new DocumentIngestionS3Processor(this, 'Processor', {
  // Create a new bucket or reference an existing one
  bucketName: `customer-documents-${props.stageName}`,
  // Or: existingBucket: myBucket,
  
  singleTableDatastore, // optional: write processing metadata
  eventBus: orderEventsBus, // optional: emit events after processing
  
  // Bucket-level settings
  versioning: true,
  lifecycleRules: [
    { prefix: 'uploads/', expiration: Duration.days(90) },
  ],
});

// Access the bucket for granting permissions to other constructs
const bucket = processor.bucket;

The construct automatically:

  • Creates or references the S3 bucket with encryption (SSE-S3 or KMS)
  • Enables EventBridge notifications on the bucket (if any processor uses evented mode)
  • Creates Lambda functions for each processor with appropriate timeout and memory
  • direct mode: Configures S3 event notifications → Lambda with proper permissions
  • buffered mode: Creates SQS queue (with DLQ), configures S3 → SQS notification, sets up Lambda event source mapping with ReportBatchItemFailures, calculates visibility timeout from Lambda timeout
  • evented mode: Creates EventBridge rule matching the S3 event pattern (bucket, prefix, suffix, event type) → Lambda target with DLQ
  • Grants s3:GetObject (and s3:HeadObject) on the relevant prefix to each handler's role
  • Sets up CloudWatch alarms: processing errors, DLQ depth (buffered mode), duration P99 approaching timeout
  • Integrates with the existing monitoring infrastructure

Handler DX

// Typed handler — per-record processing with convenient context
export const handler: IngestUploadHandler = async (record, ctx) => {
  // record is typed with convenient accessors
  const { bucket, key, size, eTag, contentType } = record;
  // key is already URL-decoded — no need to handle %20 etc.
  
  // Pre-configured S3 client — reads from the triggering bucket
  const body = await ctx.s3.getObjectBody(bucket, key);
  const metadata = await ctx.s3.headObject(bucket, key);
  
  // Process the document...
  const extractedText = await processPdf(body);
  
  // Write metadata to SingleTableDatastore (if configured)
  await ctx.datastore.put({
    PK: `DOC#${key}`,
    SK: 'METADATA',
    status: 'processed',
    size,
    extractedTextLength: extractedText.length,
    processedAt: new Date().toISOString(),
  });
  
  // Emit event via EventBus (if configured)
  await ctx.events.emit('DocumentProcessed', {
    key,
    size,
    contentType,
    extractedTextLength: extractedText.length,
  });
};

Handler DX — Batch Processing (buffered mode)

// For buffered mode — process a batch of S3 records
export const handler: IngestUploadBatchHandler = async (records, ctx) => {
  const results = await Promise.allSettled(
    records.map(async (record) => {
      const body = await ctx.s3.getObjectBody(record.bucket, record.key);
      return processPdf(body);
    })
  );
  
  // Framework builds SQS batchItemFailures response
  return ctx.reportFailures(results);
};

Opinionated Defaults

Setting Default Rationale
Processing Mode buffered Production-safe default with DLQ and retry
SQS Visibility Timeout 6 × Lambda timeout AWS recommendation
DLQ maxReceiveCount 3 Retry 3 times before dead-lettering
Batch Size (buffered) 5 Balance throughput and per-record processing time
Lambda Timeout 60s Generous default for file processing
Lambda Memory 256 MB Sufficient for most text processing; override for images/video
Bucket Encryption SSE-S3 Encryption at rest, zero cost
Object Key Decoding automatic URL-decode keys in the typed record

All defaults are overridable via the definition file or construct props.

Integration Points

  • EventBus construct: Emit events after processing (e.g. DocumentProcessed, ThumbnailGenerated)
  • QueueProcessor construct: The buffered mode internally uses the same SQS patterns, consistent DX
  • SingleTableDatastore: Write processing metadata, track file status
  • Authentication: Scope bucket access per Cognito identity (e.g. uploads/{identity_id}/)
  • RestApi: Pre-signed URL generation for uploads, processing status queries

Out of Scope

  • S3 static website hosting / CloudFront distribution → different pattern entirely
  • S3 Batch Operations → different service, job-based processing
  • S3 Object Lambda → specialized access point use case
  • Multi-bucket processing pipelines → users compose individual processor constructs
  • S3 Replication → infrastructure concern, not application logic

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions