Skip to content

Feature: Real-Time Event API L3 Construct (AppSync Events) #359

@hoegertn

Description

@hoegertn

Summary

Add an opinionated L3 construct for AWS AppSync Events API that follows the cdk-serverless code generation approach. Users define channel namespaces, payload schemas, and authorization rules in a definition file and get typed event handlers (onPublish, onSubscribe), typed server-side publishing utilities, and managed infrastructure — all generated automatically using AppSync's managed WebSocket infrastructure for connection lifecycle, fan-out, and scaling.

Problem

Real-time features (notifications, live updates, collaborative editing, dashboards) are increasingly expected in modern applications. AppSync Events API provides managed serverless WebSocket infrastructure with channel-based pub/sub, but wiring it up still requires:

  • Creating the AppSync Event API with proper authorization configuration
  • Defining channel namespaces and their auth rules
  • Writing onPublish and onSubscribe event handlers with untyped payloads
  • Building server-side publishing logic (HTTP POST to the AppSync Events endpoint)
  • Configuring multi-auth (e.g., Cognito for users, IAM for service-to-service)
  • Setting up custom domains
  • No type-safe channel payloads — everything is raw JSON

AppSync Events eliminates the need to manage connections, DynamoDB connection stores, or @connections POST-back logic that API Gateway WebSocket APIs require. The managed fan-out to subscribers, automatic connection lifecycle handling, and native channel semantics make it the right foundation for a cdk-serverless real-time construct.

Proposed Solution

Definition File

# realtime/live-updates.yaml
apiName: LiveUpdates
namespaces:
  orders:
    description: Real-time order status updates
    authorization:
      publish: iam       # only backend services can publish
      subscribe: cognito # authenticated users can subscribe
    channels:
      statusUpdate:
        publishSchema:
          type: object
          properties:
            orderId:
              type: string
            status:
              type: string
              enum: [pending, processing, shipped, delivered, cancelled]
            estimatedDelivery:
              type: string
              format: date-time
            carrier:
              type: string
          required: [orderId, status]
        subscribeSchema:
          # What the client receives (can differ from publish if onPublish transforms)
          type: object
          properties:
            orderId:
              type: string
            status:
              type: string
            estimatedDelivery:
              type: string
            carrier:
              type: string
            updatedAt:
              type: number
          required: [orderId, status, updatedAt]
        onPublish: true    # generate a handler to validate/transform
        onSubscribe: true  # generate a handler for authorization

  chat:
    description: Real-time chat messages
    authorization:
      publish: cognito
      subscribe: cognito
    channels:
      message:
        publishSchema:
          type: object
          properties:
            roomId:
              type: string
            text:
              type: string
            replyTo:
              type: string
          required: [roomId, text]
        subscribeSchema:
          type: object
          properties:
            roomId:
              type: string
            text:
              type: string
            replyTo:
              type: string
            userId:
              type: string
            displayName:
              type: string
            sentAt:
              type: number
          required: [roomId, text, userId, sentAt]
        onPublish: true
        onSubscribe: true

  system:
    description: System-wide broadcast notifications
    authorization:
      publish: iam
      subscribe: cognito
    channels:
      announcement:
        publishSchema:
          type: object
          properties:
            title:
              type: string
            message:
              type: string
            severity:
              type: string
              enum: [info, warning, critical]
          required: [title, message, severity]
        onPublish: false   # no transformation needed
        onSubscribe: false # all authenticated users can subscribe

Projen Integration

import { RealtimeApi } from 'cdk-serverless/projen';

new RealtimeApi(project, {
  apiName: 'LiveUpdates',
  definitionFile: 'realtime/live-updates.yaml',
});

Running projen generates:

  • Typed payload interfaces per channel (e.g. OrderStatusUpdatePublishPayload, ChatMessageSubscribePayload)
  • Typed onPublish handler signatures (e.g. OrderStatusUpdatePublishHandler)
  • Typed onSubscribe handler signatures (e.g. ChatMessageSubscribeHandler)
  • A typed EventPublisher class for server-side publishing with per-channel methods
  • The L3 CDK construct

CDK Construct Usage

import { LiveUpdatesRealtimeApi } from './generated/realtime.liveupdates.generated';

const realtimeApi = new LiveUpdatesRealtimeApi(this, 'RealtimeApi', {
  stageName: props.stageName,
  domainName: props.domainName,
  realtimeHostname: 'rt', // rt.example.com
  authentication, // Cognito user pool integration
  additionalEnv: {
    DOMAIN_NAME: props.domainName,
  },
});

// Access the API endpoint for wiring into other constructs
const httpEndpoint = realtimeApi.httpEndpoint;
const realtimeEndpoint = realtimeApi.realtimeEndpoint;

The construct automatically:

  • Creates the AppSync Event API with channel namespace configuration
  • Configures authorization modes (Cognito, IAM, API Key, OIDC — multi-auth per namespace)
  • Creates Lambda functions for onPublish and onSubscribe handlers where defined
  • Connects event handlers to the appropriate channel namespaces
  • Configures custom domain (CNAME + ACM certificate) consistent with RestApi/GraphQlApi pattern
  • Sets up CloudWatch logging with configurable log level
  • Grants appsync:EventPublish permissions to Lambdas using the generated publisher
  • Integrates with the existing monitoring infrastructure

onPublish Handler DX

// Validate and enrich events before they reach subscribers
export const handler: OrderStatusUpdatePublishHandler = async (event, ctx) => {
  const { orderId, status, estimatedDelivery, carrier } = event.payload;
  
  // Validate: does the order exist?
  const order = await ctx.datastore.get({ PK: `ORDER#${orderId}` });
  if (!order) {
    return { error: 'Order not found' };
  }
  
  // Enrich: add timestamp, then AppSync handles fan-out
  return {
    payload: {
      orderId,
      status,
      estimatedDelivery,
      carrier,
      updatedAt: Date.now(),
    },
  };
};

onSubscribe Handler DX

// Control who can subscribe to which channels
export const handler: ChatMessageSubscribeHandler = async (event, ctx) => {
  const { roomId } = event.channelPath; // parsed from /chat/message/{roomId}
  const userId = ctx.identity.sub;
  
  // Check room membership
  const membership = await ctx.datastore.get({
    PK: `ROOM#${roomId}`,
    SK: `MEMBER#${userId}`,
  });
  
  if (!membership) {
    return { reject: true };
  }
  
  return { allow: true };
};

Server-Side Publishing (from other Lambdas)

import { LiveUpdatesPublisher } from './generated/realtime.liveupdates-publisher.generated';

// In an EventBridge handler, REST API handler, or any Lambda
const publisher = new LiveUpdatesPublisher();

// Type-safe — schema enforces payload shape, auto-completion works
await publisher.publish('orders', 'statusUpdate', {
  orderId: '123',
  status: 'shipped',
  estimatedDelivery: '2026-03-25T14:00:00Z',
  carrier: 'DHL',
  updatedAt: Date.now(),
});

// Broadcast system announcement — no channel path needed
await publisher.publish('system', 'announcement', {
  title: 'Scheduled Maintenance',
  message: 'System will be unavailable from 02:00-04:00 CET',
  severity: 'warning',
});

Connecting EDA → Real-Time Push

// In your stack — wire EventBridge events to real-time push
// EventBridge rule → Lambda → AppSync Events publish
const bus = new OrderEventsEventBus(this, 'Bus', { /* ... */ });

// The generated handler for OrderStatusChanged can publish to the real-time API
// by importing the LiveUpdatesPublisher in the handler code.
// Optionally, a convenience wiring method:
realtimeApi.connectEventBus(bus, {
  event: 'OrderStatusChanged',
  namespace: 'orders',
  channel: 'statusUpdate',
  // Transform EventBridge detail to AppSync Events payload
  transform: (detail) => ({
    orderId: detail.orderId,
    status: detail.newStatus,
    updatedAt: Date.now(),
  }),
});

Integration Points

  • EventBus construct: EventBridge rule → Lambda → AppSync Events publish, enabling backend event → real-time client push pipeline
  • Authentication: Cognito user pools wire directly into AppSync Events authorization modes
  • RestApi / GraphQlApi: Mutations or API calls trigger real-time pushes through the shared publisher utility
  • SingleTableDatastore: DynamoDB Streams → Lambda → publish to channel for real-time data sync; onPublish/onSubscribe handlers get a pre-configured datastore client
  • TopicPublisher construct: SNS subscriber Lambda publishes to AppSync Events for client push
  • S3EventProcessor construct: File processing completion → real-time progress update to client

Client Integration Notes

While client-side code generation is out of scope for this issue, the generated types and endpoint configuration should make client integration straightforward. The construct should output:

  • HTTP endpoint URL (for server-side publishing)
  • Real-time (WebSocket) endpoint URL (for client subscriptions)
  • API key (if API Key auth is used)
  • Channel namespace paths

These outputs enable frontend frameworks (Amplify, custom WebSocket clients) to connect with minimal configuration.

Test Utility Extension

Extend the existing IntegTestUtil with real-time testing capabilities:

const test = new IntegTestUtil({ /* existing config + */ realtimeOptions: {
  httpEndpoint: 'https://rt.example.com',
  realtimeEndpoint: 'wss://rt.example.com/event/realtime',
}});

// Subscribe and assert
const subscription = await test.subscribe('orders/statusUpdate', authenticatedUser);
await test.publish('orders', 'statusUpdate', { orderId: '123', status: 'shipped', updatedAt: Date.now() });
const received = await subscription.waitForMessage(5000);
expect(received.orderId).toBe('123');
await subscription.close();

// Test subscription rejection
await expect(
  test.subscribe('chat/message/room-999', unauthorizedUser)
).rejects.toThrow();

Out of Scope

  • Client-side code generation (React hooks, Amplify integration) → separate concern
  • AppSync GraphQL subscriptions → the existing GraphQlApi construct covers this
  • Custom WebSocket protocol handling → AppSync Events manages the protocol
  • Presence / typing indicators → application-level logic built on top of channels
  • Message history / persistence → use SingleTableDatastore separately; AppSync Events is fire-and-forget

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions