Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
428 changes: 354 additions & 74 deletions poc-to-prod/bedrock-batch-orchestrator/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { BedrockBatchOrchestratorStack } from '../lib/bedrock-batch-orchestrator

const app = new cdk.App();
new BedrockBatchOrchestratorStack(app, 'BedrockBatchOrchestratorStack', {
bedrockBatchInferenceMaxConcurrency: app.node.tryGetContext('bedrockBatchInferenceMaxConcurrency')!, // required in cdk.json
maxSubmittedAndInProgressJobs: app.node.tryGetContext('maxSubmittedAndInProgressJobs')!, // required in cdk.json
bedrockBatchInferenceTimeoutHours: app.node.tryGetContext('bedrockBatchInferenceTimeoutHours'),
notificationEmails: app.node.tryGetContext('notificationEmails'),
});
4 changes: 4 additions & 0 deletions poc-to-prod/bedrock-batch-orchestrator/cdk.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
]
},
"context": {
"_comment_maxSubmittedAndInProgressJobs": "Maximum number of concurrent batch inference jobs (default: 20)",
"maxSubmittedAndInProgressJobs": 20,
"_comment_bedrockBatchInferenceTimeoutHours": "Timeout in hours for batch inference jobs (default: 24)",
"bedrockBatchInferenceTimeoutHours": 24,
"_comment_notificationEmails": "Optional: Array of email addresses to receive pipeline completion notifications. Example: [\"user1@example.com\", \"user2@example.com\"]",
"notificationEmails": [],
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
"@aws-cdk/core:checkSecretUsage": true,
"@aws-cdk/core:target-partitions": [
Expand Down
6 changes: 5 additions & 1 deletion poc-to-prod/bedrock-batch-orchestrator/lambda/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
FROM --platform=linux/amd64 public.ecr.aws/lambda/python:3.11

# Install build dependencies for numpy and other packages that need compilation
RUN yum install -y gcc gcc-c++ make python3-devel && yum clean all

COPY requirements.txt .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
RUN pip3 install --upgrade pip && \
pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

COPY . ${LAMBDA_TASK_ROOT}

Expand Down
76 changes: 74 additions & 2 deletions poc-to-prod/bedrock-batch-orchestrator/lambda/custom_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TypedDict, List, Optional, Dict, Literal
from typing import TypedDict, List, Optional, Dict, Literal, Union


"""
Expand All @@ -9,14 +9,50 @@
"""


# ============================================================================
# PROMPT CONFIGURATION TYPES
# ============================================================================

class PromptConfigSingle(TypedDict):
"""Single prompt applied to all input records"""
mode: Literal['single']
prompt_id: str


class PromptConfigMapped(TypedDict):
"""Prompt ID read from a CSV column for each record"""
mode: Literal['mapped']
column_name: str
image_column: Optional[str] # Column containing image paths (for multimodal)


class PromptConfigExpanded(TypedDict):
"""Multiple prompts applied per input record based on category mapping"""
mode: Literal['expanded']
category_column: str # Column to read category value from
image_column: Optional[str] # Column containing image paths (for multimodal)
expansion_mapping: Dict[str, str] # category_value -> expansion_rule_name


# Union type for all prompt configuration modes
PromptConfig = Union[PromptConfigSingle, PromptConfigMapped, PromptConfigExpanded]


# ============================================================================
# JOB INPUT TYPES
# ============================================================================

class JobInput(TypedDict):
"""Input to the step function - event structure for preprocess.py handler"""
s3_uri: Optional[str]
dataset_id: Optional[str]
split: Optional[str]
job_name_prefix: str
model_id: str
prompt_id: Optional[str]
prompt_id: Optional[str] # For backward compatibility
prompt_config: Optional[PromptConfig] # New unified prompt configuration
input_type: Optional[Literal['text', 'image']] # Type of input data
image_column: Optional[str] # Column name for image paths in CSV (default: 'image_path')
max_num_jobs: Optional[int]
max_records_per_job: Optional[int]

Expand Down Expand Up @@ -68,4 +104,40 @@ class CompletedJobsList(TypedDict):
completed_jobs: List[TaskItem]


# ============================================================================
# PIPELINE CONFIGURATION TYPES
# ============================================================================

class PipelineStage(TypedDict):
"""Configuration for a single stage in a multi-stage pipeline"""
stage_name: str
model_id: str
input_s3_uri: Optional[str] # Explicit input S3 URI (if not using previous output)
input_type: Optional[Literal['text', 'image']] # Type of input data
job_name_prefix: str
prompt_config: PromptConfig
use_previous_output: Optional[bool] # If True, use output from previous stage as input
column_mappings: Optional[Dict[str, str]] # Rename/transform columns from previous stage


class PipelineConfig(TypedDict):
"""Configuration for a multi-stage batch inference pipeline"""
pipeline_name: str
presigned_url_expiry_days: Optional[int] # Expiry for presigned URLs in notifications (default: 7)
stages: List[PipelineStage]


# ============================================================================
# VALIDATION TYPES
# ============================================================================

class ValidationResult(TypedDict):
"""Result of pipeline configuration validation"""
valid: bool
errors: List[str] # List of validation error messages
warnings: List[str] # List of validation warning messages
estimated_records: Optional[int] # Estimated total records to process
estimated_cost_usd: Optional[float] # Estimated cost in USD



Loading