Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions artifacts/upload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ rm $SCRIPT_DIR/*.zip
zip -jr $SCRIPT_DIR/cloudwatch-custom-widget.zip $SCRIPT_DIR/../dist/lambda/cloudwatch-custom-widget/*.mjs
zip -jr $SCRIPT_DIR/restatectl.zip $SCRIPT_DIR/../lib/lambda/restatectl
zip -jr $SCRIPT_DIR/retirement-watcher.zip $SCRIPT_DIR/../dist/lambda/retirement-watcher/*.mjs
zip -jr $SCRIPT_DIR/task-sweeper.zip $SCRIPT_DIR/../dist/lambda/task-sweeper/*.mjs

aws s3 mv $SCRIPT_DIR/cloudwatch-custom-widget.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/
aws s3 mv $SCRIPT_DIR/restatectl.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/
aws s3 mv $SCRIPT_DIR/retirement-watcher.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/
aws s3 mv $SCRIPT_DIR/task-sweeper.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/
7 changes: 7 additions & 0 deletions lib/artifacts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ export function getArtifacts(
artifactsBucket,
`${version}/assets/cloudwatch-custom-widget.zip`,
),
"task-sweeper.zip": cdk.aws_lambda.Code.fromBucketV2(
artifactsBucket,
`${version}/assets/task-sweeper.zip`,
),
};
}

Expand All @@ -39,5 +43,8 @@ export function bundleArtifacts(): Record<string, cdk.aws_lambda.Code> {
"cloudwatch-custom-widget.zip": cdk.aws_lambda.Code.fromAsset(
path.join(__dirname, "../dist/lambda/cloudwatch-custom-widget"),
),
"task-sweeper.zip": cdk.aws_lambda.Code.fromAsset(
path.join(__dirname, "../dist/lambda/task-sweeper"),
),
};
}
69 changes: 69 additions & 0 deletions lib/byoc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@ export class RestateBYOC
props.retirementWatcher,
);

const _statefulTaskSweeper = createStatefulTaskSweeper(
this,
ctPrefix,
artifacts["task-sweeper.zip"],
this.ecsCluster.clusterArn,
statefulDefinition.taskDefinitionArn,
);

const monitoring = createMonitoring(
this,
PACKAGE_INFO.version,
Expand Down Expand Up @@ -1528,6 +1536,67 @@ function createRetirementWatcher(
return { fn, queue, rule };
}

function createStatefulTaskSweeper(
scope: Construct,
clusterTaskPrefix: string,
code: cdk.aws_lambda.Code,
clusterArn: string,
statefulTaskDefinitionArn: string,
): cdk.aws_lambda.IFunction {
const role = new cdk.aws_iam.Role(scope, "task-sweeper-role", {
assumedBy: new cdk.aws_iam.ServicePrincipal("lambda.amazonaws.com"),
});

role.addToPrincipalPolicy(
new cdk.aws_iam.PolicyStatement({
actions: [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
],
resources: ["*"],
effect: cdk.aws_iam.Effect.ALLOW,
sid: "AWSLambdaBasicExecutionPermissions",
}),
);
role.addToPrincipalPolicy(
new cdk.aws_iam.PolicyStatement({
actions: ["ecs:ListTasks", "ecs:DescribeTasks"],
resources: ["*"],
effect: cdk.aws_iam.Effect.ALLOW,
sid: "DiscoverTaskActions",
}),
);
role.addToPrincipalPolicy(
new cdk.aws_iam.PolicyStatement({
actions: ["ecs:StopTask"],
resources: [`${clusterTaskPrefix}*`],
effect: cdk.aws_iam.Effect.ALLOW,
sid: "StopTaskActions",
}),
);

const fn = new cdk.aws_lambda.Function(scope, "task-sweeper-lambda", {
role,
runtime: cdk.aws_lambda.Runtime.NODEJS_22_X,
architecture: cdk.aws_lambda.Architecture.ARM_64,
handler: "index.handler",
code,
timeout: cdk.Duration.seconds(300),
});
cdk.Tags.of(fn).add("Name", fn.node.path);

new cdk.CustomResource(scope, "task-sweeper-on-delete-event-handler", {
serviceToken: fn.functionArn,
properties: {
ClusterArn: clusterArn,
TaskDefinitionArn: statefulTaskDefinitionArn,
},
});

return fn;
}

function clusterTaskPrefix(clusterArn: string): string {
// arn:aws:ecs:eu-central-1:211125428070:cluster/cluster-name
const clusterSlashParts = cdk.Fn.split("/", clusterArn, 2);
Expand Down
128 changes: 128 additions & 0 deletions lib/lambda/task-sweeper/index.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { CloudFormationCustomResourceEvent, Context } from "aws-lambda";
import {
ECSClient,
ListTasksCommand,
StopTaskCommand,
DescribeTasksCommand,
waitUntilTasksStopped,
} from "@aws-sdk/client-ecs";

const ecs = new ECSClient({});

export async function handler(
event: CloudFormationCustomResourceEvent,
context: Context,
) {
console.log("CloudFormation event:", JSON.stringify(event, null, 2));

try {
if (event.RequestType === "Delete") {
const clusterArn = event.ResourceProperties.ClusterArn;
const taskDefinitionArn = event.ResourceProperties.TaskDefinitionArn;

console.log(
`Cleaning up tasks for cluster: ${clusterArn}, task definition: ${taskDefinitionArn}`,
);

const listResponse = await ecs.send(
new ListTasksCommand({
cluster: clusterArn,
}),
);

if (listResponse.taskArns && listResponse.taskArns.length > 0) {
const describeResponse = await ecs.send(
new DescribeTasksCommand({
cluster: clusterArn,
tasks: listResponse.taskArns,
}),
);

const tasksToStop =
describeResponse.tasks?.filter(
(task) => task.taskDefinitionArn === taskDefinitionArn,
) || [];

console.log(`Found ${tasksToStop.length} tasks to stop`);

for (const task of tasksToStop) {
if (task.taskArn) {
console.log(`Stopping task: ${task.taskArn}`);
await ecs.send(
new StopTaskCommand({
cluster: clusterArn,
task: task.taskArn,
reason: "Stack deletion cleanup",
}),
);
}
}

const taskArnsToWaitFor = tasksToStop
.map((task) => task.taskArn)
.filter((arn): arn is string => arn !== undefined);

if (taskArnsToWaitFor.length > 0) {
console.log(
`Waiting for ${taskArnsToWaitFor.length} tasks to stop...`,
);

await waitUntilTasksStopped(
{
client: ecs,
maxWaitTime: 120,
minDelay: 5,
maxDelay: 15,
},
{
cluster: clusterArn,
tasks: taskArnsToWaitFor,
},
);

console.log("All tasks have stopped successfully");
}
}
}

await sendResponse(event, context, "SUCCESS", {});
} catch (error) {
console.error("Error:", error);
await sendResponse(event, context, "FAILED", { Error: String(error) });
}
}

async function sendResponse(
event: CloudFormationCustomResourceEvent,
context: Context,
responseStatus: string,
responseData: unknown,
) {
const responseBody = JSON.stringify({
Status: responseStatus,
Reason: `See CloudWatch Log Stream: ${context.logStreamName}`,
PhysicalResourceId: event.LogicalResourceId,
StackId: event.StackId,
RequestId: event.RequestId,
LogicalResourceId: event.LogicalResourceId,
Data: responseData,
});

const url = event.ResponseURL;

const requestOptions = {
method: "PUT",
headers: {
"Content-Type": "",
"Content-Length": responseBody.length.toString(),
},
body: responseBody,
};

try {
const response = await fetch(url, requestOptions);
console.log("Response sent successfully:", response.status);
} catch (error) {
console.error("Error sending response:", error);
}
}
Loading