Skip to content

Conversation

@mikedorfman
Copy link
Contributor

Summary: These changes introduce a new rate-limited consumer class in the Node/TypeScript code to control how many executions are submitted per second across multiple queues - helping improve and smooth out step function submission.

Addresses CUMULUS-4300: Implement Rate-Limited Lambda Dispatcher for SQS Queue (Max 5 Messages/Second)

Changes

  • Node/TypeScript:
    • Created a new ConsumerRateLimited class that is able to submit executions at a specified, even maximum rate as defined by rateLimitPerSecond. In order to enforce this limit across all throttled queues, this class accepts a list of queue URLs instead of a single throttled queue URL. Unlike its non-rate-limited counterpart, to simplify configuration, this new class does not limit the number of messages staged - that can now be indirectly controlled by increasing or decreasing the rate.
    • Added calls to the new ConsumerRateLimited class in sf-starter.js in the handleRateLimitedEvent function. This uses the incrementAndDispatch dispatcher.
  • Terraform:
    • Added a new Lambda named "sqs2sfThrottleRateLimited" that can be called with a list of queueURLs in an EventBridge scheduled rule.
    • Added sqs2sfThrottleRateLimited_lambda_function_arn outputs to both ingest and cumulus modules.

PR Checklist

  • Update CHANGELOG
  • Unit tests
  • Ad-hoc testing - Deploy changes and test manually
  • Integration tests

📝 Note:
For most pull requests, please Squash and merge to maintain a clean and readable commit history.

GitHub Actions and others added 13 commits November 19, 2025 12:35
Reworked rate limiting code in the Consumer
…n the lambda duration time, which is also configurable

Updated default behavior to not include a message limit but instead stage as many messages as possible within the time provided
Exposed sf_start_limit and sf_start_rate variables
Updated default rateLimitPerSecond max to match experimental values that correspond to a throughput of 5/second
Reverted previously-changed handleEvent function changes
Updated wait time to 5 seconds and time buffer to 1 second
Fixed a bug where we were using the wrong compare operator
for (const [message, queueUrl] of messagesWithQueueUrls) {
const waitTime = 1000 / this.rateLimitPerSecond;
log.debug(`Waiting for ${waitTime} ms`);
await sleep(waitTime); // eslint-disable-line no-await-in-loop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await sleep(waitTime); // eslint-disable-line no-await-in-loop
await sleep(waitTime);

Copy link
Member

@Jkovarik Jkovarik Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So - this is the correct line, the issue is @mikedorfman pulled in the file ignore wholesale from my suggestions/CI run commit - that's not right, we should except individual blocks. My bad for not calling it out. 8380273

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, sounds good - thanks for the clarification!

log.debug(`Waiting for ${waitTime} ms`);
await sleep(waitTime); // eslint-disable-line no-await-in-loop
if (await this.processMessage(message, fn, queueUrl)) {
counter++;
Copy link
Contributor

@etcart etcart Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
counter++;
counter += 1;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in the next commit

Comment on lines 19 to 30
export class ConsumerRateLimited {
private readonly deleteProcessedMessage: boolean;
private readonly queueUrls: string[];
private readonly timeRemainingFunc: () => number;
private readonly visibilityTimeout: number;
private readonly rateLimitPerSecond: number;
private readonly timeBuffer: number;
private readonly messageLimitPerFetch: number;
private readonly waitTime: number;

constructor({
queueUrls,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider commenting all of the class params in the interface, using optional parameters for the params that are being set to static ingegers and doing something like

e.g.

  /**
   * URLs of the SQS queues to poll.
   */
  queueUrls: string[];

and so on in the interface. Totally get the original consumer didn't have this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in the next commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, actually @Jkovarik can you clarify what you mean by using optional parameters for the params that are being set to static integers? I was thinking you were saying;

private readonly timeBuffer?: number;
private readonly messageLimitPerFetch?: number;
private readonly waitTime?: number;

But I think I'm misinterpreting since they're always set to values so aren't optional.

// we'll immediately start fetching the next batch while processing the
// current one

let messages = await Promise.all(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We're repeating this 3x, consider abstracting to a helper/private method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a fetchMessagesFromAllQueues method in the next commit.

}

/**
* This is an SQS queue consumer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

* in the event object. It is a rate-limited version of the throttled consumer.
*
*
* @param {Object} event - lambda input message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that's entirely not-obvious re: the state of the Core codebase, we have an ADR that the team/repo is trying to enable ts-check on any modified .js file and make it correct for the modified code. We should consider doing so for this docstring and avoid Object as a valid type except in specific exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see - that makes sense. Updated in the next commit.

Updated with additional PR feedback
*/
timeRemainingFunc: () => number;
/**
* The visibility timeout used when fetching messages from the SQS queues.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
* The visibility timeout used when fetching messages from the SQS queues.
* The visibility timeout in milleseconds used when fetching messages from the SQS queues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in the next commit

role = var.lambda_processing_role_arn
runtime = "nodejs20.x"
# This timeout must match the cadence of the event bridge rule that triggers it
timeout = 60
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a brief conversation offline with @mikedorfman re: this, I think there's a potential timing issue with this timeout being too close to the event loop in clearing sempahores meaning the lambda could end without clearing semaphore. We talked through it and it's probably the right call to extend the timeout and make sure the event loop is configured such that it stops dispatching based on the EB periodicity but the lambda has a much longer timeout to allow things like semaphores to clear.

Noting here to track changes/etc.

* @returns {Promise} - AWS SF Start Execution response
*/
function dispatch(queueUrl, message) {
async function dispatch(queueUrl, message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify - this (and awaiting sfn().StartExecution) was done for readability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to remember why I had update this - I believe it was my initial confusion about async functions. Given we await the dispatch function when it's called, does it make sense to update this to an async function? I don't think including or excluding this change impacts my update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly just making sure I understood why the change was made/there wasn't some side-effect somewhere I was missing.

Functionally awaiting here and making this async for something else to await is more explicit versus having a non-async function that passes back an awaitable promise, so I think it's a net good change.

@@ -0,0 +1,161 @@
/* eslint-disable no-await-in-loop */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed (I added it to get the local commit hooks to pass without cluttering pre-edit), apologies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in next commit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikedorfman I don't see this as removed - were you saying you have work in-flight you're going to push up?


test.beforeEach(() => {
testConsumer = new ConsumerRateLimited({
queueUrls: [fakeQueueName],
Copy link
Member

@Jkovarik Jkovarik Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably test multiple queues.... although with the stubbing here that might be brittle... and it's stubbed like that in the OG code.

We have the option to use localstack instead and actually test consuming messages, however there's a lot of overhead there.

More generally: I think we either need to make these units more comprehensive or write a integration test to validate this is working.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in the process of adding a test that exercises the sf-starter handleRateLimitedEvent functionality, hopefully that can cover this case (and I might add a unit test with multiple queues in anyway as well).

Copy link
Member

@Jkovarik Jkovarik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good! Just a couple of changes to consider/discuss other than minor nits/lint type stuff - the race/timeout condition concern and an integration or unit test update.

Oh - and CHANGELOG entry please!

Let me know what you think!

`No messages fetched, waiting ${this.waitTime} ms before retrying`
);
await sleep(this.waitTime);
messages = await this.fetchMessagesFromAllQueues();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: re-reading this, I think this could lead to some lag in low-message volume scenarios. The sleep isn't validating the time remaining before sleeping, so in theory you could pull messages you already know you can't process due to the while loop. The cost here is just the visibility timeout. On the fence about that being a nit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Looking at it closer myself, I think we'd always end up with messages that were fetched but not processed. since we're trying to fetchMessagesFromAllQueues while we're running the last message processing. I think if we add a processMessages call outside the while loop, that would clean up the last batch pulled from fetchMessagesFromAllQueues.

I'll add a unit test to verify this and cover this scenario.

@Jkovarik Jkovarik dismissed their stale review December 22, 2025 13:28

Dismissing as my initial concerns were addressed and/or updated and re-submitted for review. Integration tests should be reviewed.

@Jkovarik
Copy link
Member

@mikedorfman thanks for all your work on this! I dismissed my review as I believe you addressed my initial comments, but could not review the integration test updates on Friday.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants