diff --git a/artifacts/upload.sh b/artifacts/upload.sh index c184384..866d035 100755 --- a/artifacts/upload.sh +++ b/artifacts/upload.sh @@ -8,7 +8,9 @@ rm $SCRIPT_DIR/*.zip zip -jr $SCRIPT_DIR/cloudwatch-custom-widget.zip $SCRIPT_DIR/../dist/lambda/cloudwatch-custom-widget/*.mjs zip -jr $SCRIPT_DIR/restatectl.zip $SCRIPT_DIR/../lib/lambda/restatectl zip -jr $SCRIPT_DIR/retirement-watcher.zip $SCRIPT_DIR/../dist/lambda/retirement-watcher/*.mjs +zip -jr $SCRIPT_DIR/task-sweeper.zip $SCRIPT_DIR/../dist/lambda/task-sweeper/*.mjs aws s3 mv $SCRIPT_DIR/cloudwatch-custom-widget.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/ aws s3 mv $SCRIPT_DIR/restatectl.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/ aws s3 mv $SCRIPT_DIR/retirement-watcher.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/ +aws s3 mv $SCRIPT_DIR/task-sweeper.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/ diff --git a/lib/artifacts.ts b/lib/artifacts.ts index 9c68424..6121cf4 100644 --- a/lib/artifacts.ts +++ b/lib/artifacts.ts @@ -25,6 +25,10 @@ export function getArtifacts( artifactsBucket, `${version}/assets/cloudwatch-custom-widget.zip`, ), + "task-sweeper.zip": cdk.aws_lambda.Code.fromBucketV2( + artifactsBucket, + `${version}/assets/task-sweeper.zip`, + ), }; } @@ -39,5 +43,8 @@ export function bundleArtifacts(): Record { "cloudwatch-custom-widget.zip": cdk.aws_lambda.Code.fromAsset( path.join(__dirname, "../dist/lambda/cloudwatch-custom-widget"), ), + "task-sweeper.zip": cdk.aws_lambda.Code.fromAsset( + path.join(__dirname, "../dist/lambda/task-sweeper"), + ), }; } diff --git a/lib/byoc.ts b/lib/byoc.ts index f0bf6f8..af1f0ad 100644 --- a/lib/byoc.ts +++ b/lib/byoc.ts @@ -559,6 +559,14 @@ export class RestateBYOC props.retirementWatcher, ); + const _statefulTaskSweeper = createStatefulTaskSweeper( + this, + ctPrefix, + artifacts["task-sweeper.zip"], + this.ecsCluster.clusterArn, + statefulDefinition.taskDefinitionArn, + ); + const monitoring = createMonitoring( this, PACKAGE_INFO.version, @@ -1528,6 +1536,67 @@ function createRetirementWatcher( return { fn, queue, rule }; } +function createStatefulTaskSweeper( + scope: Construct, + clusterTaskPrefix: string, + code: cdk.aws_lambda.Code, + clusterArn: string, + statefulTaskDefinitionArn: string, +): cdk.aws_lambda.IFunction { + const role = new cdk.aws_iam.Role(scope, "task-sweeper-role", { + assumedBy: new cdk.aws_iam.ServicePrincipal("lambda.amazonaws.com"), + }); + + role.addToPrincipalPolicy( + new cdk.aws_iam.PolicyStatement({ + actions: [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents", + ], + resources: ["*"], + effect: cdk.aws_iam.Effect.ALLOW, + sid: "AWSLambdaBasicExecutionPermissions", + }), + ); + role.addToPrincipalPolicy( + new cdk.aws_iam.PolicyStatement({ + actions: ["ecs:ListTasks", "ecs:DescribeTasks"], + resources: ["*"], + effect: cdk.aws_iam.Effect.ALLOW, + sid: "DiscoverTaskActions", + }), + ); + role.addToPrincipalPolicy( + new cdk.aws_iam.PolicyStatement({ + actions: ["ecs:StopTask"], + resources: [`${clusterTaskPrefix}*`], + effect: cdk.aws_iam.Effect.ALLOW, + sid: "StopTaskActions", + }), + ); + + const fn = new cdk.aws_lambda.Function(scope, "task-sweeper-lambda", { + role, + runtime: cdk.aws_lambda.Runtime.NODEJS_22_X, + architecture: cdk.aws_lambda.Architecture.ARM_64, + handler: "index.handler", + code, + timeout: cdk.Duration.seconds(300), + }); + cdk.Tags.of(fn).add("Name", fn.node.path); + + new cdk.CustomResource(scope, "task-sweeper-on-delete-event-handler", { + serviceToken: fn.functionArn, + properties: { + ClusterArn: clusterArn, + TaskDefinitionArn: statefulTaskDefinitionArn, + }, + }); + + return fn; +} + function clusterTaskPrefix(clusterArn: string): string { // arn:aws:ecs:eu-central-1:211125428070:cluster/cluster-name const clusterSlashParts = cdk.Fn.split("/", clusterArn, 2); diff --git a/lib/lambda/task-sweeper/index.mts b/lib/lambda/task-sweeper/index.mts new file mode 100644 index 0000000..5ed6e6e --- /dev/null +++ b/lib/lambda/task-sweeper/index.mts @@ -0,0 +1,128 @@ +import { CloudFormationCustomResourceEvent, Context } from "aws-lambda"; +import { + ECSClient, + ListTasksCommand, + StopTaskCommand, + DescribeTasksCommand, + waitUntilTasksStopped, +} from "@aws-sdk/client-ecs"; + +const ecs = new ECSClient({}); + +export async function handler( + event: CloudFormationCustomResourceEvent, + context: Context, +) { + console.log("CloudFormation event:", JSON.stringify(event, null, 2)); + + try { + if (event.RequestType === "Delete") { + const clusterArn = event.ResourceProperties.ClusterArn; + const taskDefinitionArn = event.ResourceProperties.TaskDefinitionArn; + + console.log( + `Cleaning up tasks for cluster: ${clusterArn}, task definition: ${taskDefinitionArn}`, + ); + + const listResponse = await ecs.send( + new ListTasksCommand({ + cluster: clusterArn, + }), + ); + + if (listResponse.taskArns && listResponse.taskArns.length > 0) { + const describeResponse = await ecs.send( + new DescribeTasksCommand({ + cluster: clusterArn, + tasks: listResponse.taskArns, + }), + ); + + const tasksToStop = + describeResponse.tasks?.filter( + (task) => task.taskDefinitionArn === taskDefinitionArn, + ) || []; + + console.log(`Found ${tasksToStop.length} tasks to stop`); + + for (const task of tasksToStop) { + if (task.taskArn) { + console.log(`Stopping task: ${task.taskArn}`); + await ecs.send( + new StopTaskCommand({ + cluster: clusterArn, + task: task.taskArn, + reason: "Stack deletion cleanup", + }), + ); + } + } + + const taskArnsToWaitFor = tasksToStop + .map((task) => task.taskArn) + .filter((arn): arn is string => arn !== undefined); + + if (taskArnsToWaitFor.length > 0) { + console.log( + `Waiting for ${taskArnsToWaitFor.length} tasks to stop...`, + ); + + await waitUntilTasksStopped( + { + client: ecs, + maxWaitTime: 120, + minDelay: 5, + maxDelay: 15, + }, + { + cluster: clusterArn, + tasks: taskArnsToWaitFor, + }, + ); + + console.log("All tasks have stopped successfully"); + } + } + } + + await sendResponse(event, context, "SUCCESS", {}); + } catch (error) { + console.error("Error:", error); + await sendResponse(event, context, "FAILED", { Error: String(error) }); + } +} + +async function sendResponse( + event: CloudFormationCustomResourceEvent, + context: Context, + responseStatus: string, + responseData: unknown, +) { + const responseBody = JSON.stringify({ + Status: responseStatus, + Reason: `See CloudWatch Log Stream: ${context.logStreamName}`, + PhysicalResourceId: event.LogicalResourceId, + StackId: event.StackId, + RequestId: event.RequestId, + LogicalResourceId: event.LogicalResourceId, + Data: responseData, + }); + + const url = event.ResponseURL; + + const requestOptions = { + method: "PUT", + headers: { + "Content-Type": "", + "Content-Length": responseBody.length.toString(), + }, + body: responseBody, + }; + + try { + const response = await fetch(url, requestOptions); + console.log("Response sent successfully:", response.status); + } catch (error) { + console.error("Error sending response:", error); + } +} diff --git a/test/__snapshots__/byoc.test.ts.snap b/test/__snapshots__/byoc.test.ts.snap index 61cff93..e1d19e8 100644 --- a/test/__snapshots__/byoc.test.ts.snap +++ b/test/__snapshots__/byoc.test.ts.snap @@ -1575,6 +1575,147 @@ exports[`BYOC Default parameters 1`] = ` - defaultretirementwatcherqueue17CDF7F2 - Arn Id: Target0 + defaulttasksweeperrole60C302D7: + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: 'sts:AssumeRole' + Effect: Allow + Principal: + Service: lambda.amazonaws.com + Version: '2012-10-17' + defaulttasksweeperroleDefaultPolicy0D10737B: + Type: 'AWS::IAM::Policy' + Properties: + PolicyDocument: + Statement: + - Action: + - 'logs:CreateLogGroup' + - 'logs:CreateLogStream' + - 'logs:PutLogEvents' + Effect: Allow + Resource: '*' + Sid: AWSLambdaBasicExecutionPermissions + - Action: + - 'ecs:ListTasks' + - 'ecs:DescribeTasks' + Effect: Allow + Resource: '*' + Sid: DiscoverTaskActions + - Action: 'ecs:StopTask' + Effect: Allow + Resource: + 'Fn::Join': + - '' + - - 'Fn::Join': + - / + - - 'Fn::Join': + - ':' + - - 'Fn::Select': + - 0 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - defaultcluster07071299 + - Arn + - 'Fn::Select': + - 1 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - defaultcluster07071299 + - Arn + - 'Fn::Select': + - 2 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - defaultcluster07071299 + - Arn + - 'Fn::Select': + - 3 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - defaultcluster07071299 + - Arn + - 'Fn::Select': + - 4 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - defaultcluster07071299 + - Arn + - task + - 'Fn::Select': + - 1 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - defaultcluster07071299 + - Arn + - '' + - '*' + Sid: StopTaskActions + Version: '2012-10-17' + PolicyName: defaulttasksweeperroleDefaultPolicy0D10737B + Roles: + - Ref: defaulttasksweeperrole60C302D7 + defaulttasksweeperlambdaE1CECADA: + Type: 'AWS::Lambda::Function' + Properties: + Architectures: + - arm64 + Code: Any + Handler: index.handler + Role: + 'Fn::GetAtt': + - defaulttasksweeperrole60C302D7 + - Arn + Runtime: nodejs22.x + Tags: + - Key: Name + Value: RestateBYOC/default/task-sweeper-lambda + Timeout: 300 + DependsOn: + - defaulttasksweeperroleDefaultPolicy0D10737B + - defaulttasksweeperrole60C302D7 + defaulttasksweeperondeleteeventhandlerE209FA2B: + Type: 'AWS::CloudFormation::CustomResource' + Properties: + ServiceToken: + 'Fn::GetAtt': + - defaulttasksweeperlambdaE1CECADA + - Arn + ClusterArn: + 'Fn::GetAtt': + - defaultcluster07071299 + - Arn + TaskDefinitionArn: + Ref: defaultstatefuldefinition4CD55F35 + UpdateReplacePolicy: Delete + DeletionPolicy: Delete defaultcloudwatchcustomwidgetexecutionrole4ED639C0: Type: 'AWS::IAM::Role' Properties: @@ -3601,6 +3742,147 @@ exports[`BYOC With otel collector 1`] = ` - withotelcollectorretirementwatcherqueue9A16F3EA - Arn Id: Target0 + withotelcollectortasksweeperroleB873C8D3: + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: 'sts:AssumeRole' + Effect: Allow + Principal: + Service: lambda.amazonaws.com + Version: '2012-10-17' + withotelcollectortasksweeperroleDefaultPolicy2444EC81: + Type: 'AWS::IAM::Policy' + Properties: + PolicyDocument: + Statement: + - Action: + - 'logs:CreateLogGroup' + - 'logs:CreateLogStream' + - 'logs:PutLogEvents' + Effect: Allow + Resource: '*' + Sid: AWSLambdaBasicExecutionPermissions + - Action: + - 'ecs:ListTasks' + - 'ecs:DescribeTasks' + Effect: Allow + Resource: '*' + Sid: DiscoverTaskActions + - Action: 'ecs:StopTask' + Effect: Allow + Resource: + 'Fn::Join': + - '' + - - 'Fn::Join': + - / + - - 'Fn::Join': + - ':' + - - 'Fn::Select': + - 0 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - withotelcollectorcluster09679981 + - Arn + - 'Fn::Select': + - 1 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - withotelcollectorcluster09679981 + - Arn + - 'Fn::Select': + - 2 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - withotelcollectorcluster09679981 + - Arn + - 'Fn::Select': + - 3 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - withotelcollectorcluster09679981 + - Arn + - 'Fn::Select': + - 4 + - 'Fn::Split': + - ':' + - 'Fn::Select': + - 0 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - withotelcollectorcluster09679981 + - Arn + - task + - 'Fn::Select': + - 1 + - 'Fn::Split': + - / + - 'Fn::GetAtt': + - withotelcollectorcluster09679981 + - Arn + - '' + - '*' + Sid: StopTaskActions + Version: '2012-10-17' + PolicyName: withotelcollectortasksweeperroleDefaultPolicy2444EC81 + Roles: + - Ref: withotelcollectortasksweeperroleB873C8D3 + withotelcollectortasksweeperlambdaF867C755: + Type: 'AWS::Lambda::Function' + Properties: + Architectures: + - arm64 + Code: Any + Handler: index.handler + Role: + 'Fn::GetAtt': + - withotelcollectortasksweeperroleB873C8D3 + - Arn + Runtime: nodejs22.x + Tags: + - Key: Name + Value: RestateBYOC/with-otel-collector/task-sweeper-lambda + Timeout: 300 + DependsOn: + - withotelcollectortasksweeperroleDefaultPolicy2444EC81 + - withotelcollectortasksweeperroleB873C8D3 + withotelcollectortasksweeperondeleteeventhandlerA980DB76: + Type: 'AWS::CloudFormation::CustomResource' + Properties: + ServiceToken: + 'Fn::GetAtt': + - withotelcollectortasksweeperlambdaF867C755 + - Arn + ClusterArn: + 'Fn::GetAtt': + - withotelcollectorcluster09679981 + - Arn + TaskDefinitionArn: + Ref: withotelcollectorstatefuldefinitionFB434419 + UpdateReplacePolicy: Delete + DeletionPolicy: Delete withotelcollectorcloudwatchcustomwidgetexecutionrole56230195: Type: 'AWS::IAM::Role' Properties: diff --git a/test/e2e/cluster-stack.ts b/test/e2e/cluster-stack.ts index 484cf7d..5afac67 100644 --- a/test/e2e/cluster-stack.ts +++ b/test/e2e/cluster-stack.ts @@ -145,36 +145,6 @@ export class RestateClusterStack extends cdk.Stack { }); new cdk.CfnOutput(this, "clusterName", { value: restate.clusterName }); - // TODO: some of this cleanup assistance should probably move into the BYOC construct - // - // Make sure to clean up if the controller shuts down early, or we won't be able to delete the cluster - const taskCleanupLambda = new cdk.aws_lambda_nodejs.NodejsFunction( - this, - "TaskCleanupLambda", - { - runtime: cdk.aws_lambda.Runtime.NODEJS_22_X, - entry: path.join(__dirname, "lambda/task-cleanup.ts"), - architecture: cdk.aws_lambda.Architecture.ARM_64, - timeout: cdk.Duration.minutes(5), - initialPolicy: [ - // TODO: tighten up policy - new cdk.aws_iam.PolicyStatement({ - actions: ["ecs:ListTasks", "ecs:StopTask", "ecs:DescribeTasks"], - resources: ["*"], - }), - ], - bundling: { - externalModules: ["@aws-sdk"], - }, - }, - ); - new cdk.CustomResource(this, "stateful-task-sweeper", { - serviceToken: taskCleanupLambda.functionArn, - properties: { - ClusterArn: restate.ecsCluster.clusterArn, - TaskDefinitionArn: restate.stateful.taskDefinition.taskDefinitionArn, - }, - }); // Retain bucket until the cluster is fully deleted restate.ecsCluster.node.addDependency(bucket); }