Skip to content

Commit 9ffa2c2

Browse files
committed
Add stateful task sweeper to clean up on stack delete
1 parent b94646a commit 9ffa2c2

File tree

6 files changed

+474
-30
lines changed

6 files changed

+474
-30
lines changed

artifacts/upload.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ rm $SCRIPT_DIR/*.zip
88
zip -jr $SCRIPT_DIR/cloudwatch-custom-widget.zip $SCRIPT_DIR/../dist/lambda/cloudwatch-custom-widget/*.mjs
99
zip -jr $SCRIPT_DIR/restatectl.zip $SCRIPT_DIR/../lib/lambda/restatectl
1010
zip -jr $SCRIPT_DIR/retirement-watcher.zip $SCRIPT_DIR/../dist/lambda/retirement-watcher/*.mjs
11+
zip -jr $SCRIPT_DIR/task-sweeper.zip $SCRIPT_DIR/../dist/lambda/task-sweeper/*.mjs
1112

1213
aws s3 mv $SCRIPT_DIR/cloudwatch-custom-widget.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/
1314
aws s3 mv $SCRIPT_DIR/restatectl.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/
1415
aws s3 mv $SCRIPT_DIR/retirement-watcher.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/
16+
aws s3 mv $SCRIPT_DIR/task-sweeper.zip s3://restate-byoc-artifacts-public-eu-central-1/${VERSION}/assets/

lib/artifacts.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ export function getArtifacts(
2525
artifactsBucket,
2626
`${version}/assets/cloudwatch-custom-widget.zip`,
2727
),
28+
"task-sweeper.zip": cdk.aws_lambda.Code.fromBucketV2(
29+
artifactsBucket,
30+
`${version}/assets/task-sweeper.zip`,
31+
),
2832
};
2933
}
3034

@@ -39,5 +43,8 @@ export function bundleArtifacts(): Record<string, cdk.aws_lambda.Code> {
3943
"cloudwatch-custom-widget.zip": cdk.aws_lambda.Code.fromAsset(
4044
path.join(__dirname, "../dist/lambda/cloudwatch-custom-widget"),
4145
),
46+
"task-sweeper.zip": cdk.aws_lambda.Code.fromAsset(
47+
path.join(__dirname, "../dist/lambda/task-sweeper"),
48+
),
4249
};
4350
}

lib/byoc.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,14 @@ export class RestateBYOC
559559
props.retirementWatcher,
560560
);
561561

562+
const _statefulTaskSweeper = createStatefulTaskSweeper(
563+
this,
564+
ctPrefix,
565+
artifacts["task-sweeper.zip"],
566+
this.ecsCluster.clusterArn,
567+
statefulDefinition.taskDefinitionArn,
568+
);
569+
562570
const monitoring = createMonitoring(
563571
this,
564572
PACKAGE_INFO.version,
@@ -1528,6 +1536,59 @@ function createRetirementWatcher(
15281536
return { fn, queue, rule };
15291537
}
15301538

1539+
function createStatefulTaskSweeper(
1540+
scope: Construct,
1541+
clusterTaskPrefix: string,
1542+
code: cdk.aws_lambda.Code,
1543+
clusterArn: string,
1544+
statefulTaskDefinitionArn: string,
1545+
): cdk.aws_lambda.IFunction {
1546+
const role = new cdk.aws_iam.Role(scope, "task-sweeper-role", {
1547+
assumedBy: new cdk.aws_iam.ServicePrincipal("lambda.amazonaws.com"),
1548+
});
1549+
1550+
role.addToPrincipalPolicy(
1551+
new cdk.aws_iam.PolicyStatement({
1552+
actions: [
1553+
"logs:CreateLogGroup",
1554+
"logs:CreateLogStream",
1555+
"logs:PutLogEvents",
1556+
],
1557+
resources: ["*"],
1558+
effect: cdk.aws_iam.Effect.ALLOW,
1559+
sid: "AWSLambdaBasicExecutionPermissions",
1560+
}),
1561+
);
1562+
role.addToPrincipalPolicy(
1563+
new cdk.aws_iam.PolicyStatement({
1564+
actions: ["ecs:ListTasks", "ecs:StopTask", "ecs:DescribeTasks"],
1565+
resources: [`${clusterTaskPrefix}*`],
1566+
effect: cdk.aws_iam.Effect.ALLOW,
1567+
sid: "TaskActions",
1568+
}),
1569+
);
1570+
1571+
const fn = new cdk.aws_lambda.Function(scope, "task-sweeper-lambda", {
1572+
role,
1573+
runtime: cdk.aws_lambda.Runtime.NODEJS_22_X,
1574+
architecture: cdk.aws_lambda.Architecture.ARM_64,
1575+
handler: "index.handler",
1576+
code,
1577+
timeout: cdk.Duration.seconds(60),
1578+
});
1579+
cdk.Tags.of(fn).add("Name", fn.node.path);
1580+
1581+
new cdk.CustomResource(scope, "task-sweeper-on-delete-event-handler", {
1582+
serviceToken: fn.functionArn,
1583+
properties: {
1584+
ClusterArn: clusterArn,
1585+
TaskDefinitionArn: statefulTaskDefinitionArn,
1586+
},
1587+
});
1588+
1589+
return fn;
1590+
}
1591+
15311592
function clusterTaskPrefix(clusterArn: string): string {
15321593
// arn:aws:ecs:eu-central-1:211125428070:cluster/cluster-name
15331594
const clusterSlashParts = cdk.Fn.split("/", clusterArn, 2);

lib/lambda/task-sweeper/index.mts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import { CloudFormationCustomResourceEvent, Context } from "aws-lambda";
2+
import {
3+
ECSClient,
4+
ListTasksCommand,
5+
StopTaskCommand,
6+
DescribeTasksCommand,
7+
waitUntilTasksStopped,
8+
} from "@aws-sdk/client-ecs";
9+
10+
const ecs = new ECSClient({});
11+
12+
export async function handler(
13+
event: CloudFormationCustomResourceEvent,
14+
context: Context,
15+
) {
16+
console.log("CloudFormation event:", JSON.stringify(event, null, 2));
17+
18+
try {
19+
if (event.RequestType === "Delete") {
20+
const clusterArn = event.ResourceProperties.ClusterArn;
21+
const taskDefinitionArn = event.ResourceProperties.TaskDefinitionArn;
22+
23+
console.log(
24+
`Cleaning up tasks for cluster: ${clusterArn}, task definition: ${taskDefinitionArn}`,
25+
);
26+
27+
const listResponse = await ecs.send(
28+
new ListTasksCommand({
29+
cluster: clusterArn,
30+
}),
31+
);
32+
33+
if (listResponse.taskArns && listResponse.taskArns.length > 0) {
34+
const describeResponse = await ecs.send(
35+
new DescribeTasksCommand({
36+
cluster: clusterArn,
37+
tasks: listResponse.taskArns,
38+
}),
39+
);
40+
41+
const tasksToStop =
42+
describeResponse.tasks?.filter(
43+
(task) => task.taskDefinitionArn === taskDefinitionArn,
44+
) || [];
45+
46+
console.log(`Found ${tasksToStop.length} tasks to stop`);
47+
48+
for (const task of tasksToStop) {
49+
if (task.taskArn) {
50+
console.log(`Stopping task: ${task.taskArn}`);
51+
await ecs.send(
52+
new StopTaskCommand({
53+
cluster: clusterArn,
54+
task: task.taskArn,
55+
reason: "Stack deletion cleanup",
56+
}),
57+
);
58+
}
59+
}
60+
61+
const taskArnsToWaitFor = tasksToStop
62+
.map((task) => task.taskArn)
63+
.filter((arn): arn is string => arn !== undefined);
64+
65+
if (taskArnsToWaitFor.length > 0) {
66+
console.log(
67+
`Waiting for ${taskArnsToWaitFor.length} tasks to stop...`,
68+
);
69+
70+
await waitUntilTasksStopped(
71+
{
72+
client: ecs,
73+
maxWaitTime: 3000,
74+
minDelay: 5,
75+
maxDelay: 15,
76+
},
77+
{
78+
cluster: clusterArn,
79+
tasks: taskArnsToWaitFor,
80+
},
81+
);
82+
83+
console.log("All tasks have stopped successfully");
84+
}
85+
}
86+
}
87+
88+
await sendResponse(event, context, "SUCCESS", {});
89+
} catch (error) {
90+
console.error("Error:", error);
91+
await sendResponse(event, context, "FAILED", { Error: String(error) });
92+
}
93+
}
94+
95+
async function sendResponse(
96+
event: CloudFormationCustomResourceEvent,
97+
context: Context,
98+
responseStatus: string,
99+
responseData: unknown,
100+
) {
101+
const responseBody = JSON.stringify({
102+
Status: responseStatus,
103+
Reason: `See CloudWatch Log Stream: ${context.logStreamName}`,
104+
PhysicalResourceId: event.LogicalResourceId,
105+
StackId: event.StackId,
106+
RequestId: event.RequestId,
107+
LogicalResourceId: event.LogicalResourceId,
108+
Data: responseData,
109+
});
110+
111+
const url = event.ResponseURL;
112+
113+
const requestOptions = {
114+
method: "PUT",
115+
headers: {
116+
"Content-Type": "",
117+
"Content-Length": responseBody.length.toString(),
118+
},
119+
body: responseBody,
120+
};
121+
122+
try {
123+
const response = await fetch(url, requestOptions);
124+
console.log("Response sent successfully:", response.status);
125+
} catch (error) {
126+
console.error("Error sending response:", error);
127+
}
128+
}

0 commit comments

Comments
 (0)