diff --git a/docs.json b/docs.json index 5e06d314..1be02c92 100644 --- a/docs.json +++ b/docs.json @@ -1097,7 +1097,8 @@ "qstash/sdks/ts/examples/logs", "qstash/sdks/ts/examples/messages", "qstash/sdks/ts/examples/receiver", - "qstash/sdks/ts/examples/queues" + "qstash/sdks/ts/examples/queues", + "qstash/sdks/ts/examples/flow-control" ] } ] @@ -1119,7 +1120,8 @@ "qstash/sdks/py/examples/messages", "qstash/sdks/py/examples/keys", "qstash/sdks/py/examples/receiver", - "qstash/sdks/py/examples/queues" + "qstash/sdks/py/examples/queues", + "qstash/sdks/py/examples/flow-control" ] } ] @@ -1346,7 +1348,8 @@ "group": "Flow Control", "pages": [ "workflow/rest/flow-control/get", - "workflow/rest/flow-control/list" + "workflow/rest/flow-control/list", + "workflow/rest/flow-control/global-parallelism" ] }, { diff --git a/qstash/features/flowcontrol.mdx b/qstash/features/flowcontrol.mdx index 3d252861..a0f0ecbc 100644 --- a/qstash/features/flowcontrol.mdx +++ b/qstash/features/flowcontrol.mdx @@ -101,7 +101,115 @@ curl -XPOST -H 'Authorization: Bearer XXX' \ ``` -## Monitor +## Management API + +You can inspect flow control keys programmatically using the `flowControl` namespace on the client. + +### Get a single flow control key + +Returns the current state and metrics for one flow control key. + + +```typescript TypeScript +import { Client } from "@upstash/qstash"; + +const client = new Client({ token: "" }); + +const info = await client.flowControl.get("USER_GIVEN_KEY"); +console.log(info); +// { +// flowControlKey: "USER_GIVEN_KEY", +// waitListSize: 5, +// parallelismMax: 10, +// parallelismCount: 3, +// rateMax: 100, +// rateCount: 42, +// ratePeriod: 60, +// ratePeriodStart: 1708000000 +// } +``` + +```python Python +from qstash import QStash + +client = QStash("") + +info = client.flow_control.get("USER_GIVEN_KEY") +print(info) +# FlowControlInfo( +# key="USER_GIVEN_KEY", +# wait_list_size=5, +# parallelism_max=10, +# parallelism_count=3, +# rate_max=100, +# rate_count=42, +# rate_period=60, +# rate_period_start=1708000000 +# ) +``` + +```bash cURL +curl -X GET https://qstash.upstash.io/v2/flowControl/USER_GIVEN_KEY \ + -H "Authorization: Bearer " +``` + + +The response fields are: + +| Field | Description | +|---|---| +| `flowControlKey` | The flow control key name | +| `waitListSize` | Number of messages currently waiting in the queue | +| `parallelismMax` | Configured maximum concurrent messages (if set) | +| `parallelismCount` | Number of messages currently running in parallel | +| `rateMax` | Configured maximum messages per rate period (if set) | +| `rateCount` | Number of messages dispatched in the current rate period | +| `ratePeriod` | Rate period length in seconds | +| `ratePeriodStart` | Unix timestamp when the current rate period started | + +### Get global parallelism + +Returns the global parallelism usage across all flow control keys. + + +```typescript TypeScript +import { Client } from "@upstash/qstash"; + +const client = new Client({ token: "" }); + +const info = await client.flowControl.getGlobalParallelism(); +console.log(info); +// { +// parallelismMax: 500, +// parallelismCount: 42 +// } +``` + +```python Python +from qstash import QStash + +client = QStash("") + +info = client.flow_control.get_global_parallelism() +print(info) +# GlobalParallelismInfo( +# parallelism_max=500, +# parallelism_count=42 +# ) +``` + +```bash cURL +curl -X GET https://qstash.upstash.io/v2/globalParallelism \ + -H "Authorization: Bearer " +``` + + +| Field | Description | +|---|---| +| `parallelismMax` | The maximum global parallelism | +| `parallelismCount` | The current number of active requests globally | + +## Monitor You can monitor wait list size of your flow control key's from the console `FlowControl` tab. @@ -109,6 +217,7 @@ You can monitor wait list size of your flow control key's from the console `Flow -Also you can get the same info using the REST API. +Also you can get the same info using the REST API. - [List All Flow Control Keys](/qstash/api/flow-control/list). - [Single Flow Control Key](/qstash/api/flow-control/get). +- [Global Parallelism](/qstash/api/flow-control/global-parallelism). diff --git a/qstash/openapi.yaml b/qstash/openapi.yaml index 91543d7e..50a3db35 100644 --- a/qstash/openapi.yaml +++ b/qstash/openapi.yaml @@ -291,9 +291,27 @@ components: flowControlKey: type: string description: The flow control key name - waitlistSize: + waitListSize: type: integer description: The number of messages waiting due to flow control configuration. + parallelismMax: + type: integer + description: The configured maximum number of messages allowed to run concurrently, if parallelism is set. + parallelismCount: + type: integer + description: The current number of messages running in parallel. + rateMax: + type: integer + description: The configured maximum number of messages allowed per rate period, if rate limiting is set. + rateCount: + type: integer + description: The number of messages dispatched in the current rate period. + ratePeriod: + type: integer + description: The length of the rate period in seconds. + ratePeriodStart: + type: integer + description: Unix timestamp (seconds) when the current rate period started. DLQMessage: type: object @@ -2398,8 +2416,10 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FlowControlKey' - + type: array + items: + $ref: '#/components/schemas/FlowControlKey' + /v2/flowControl/{flowControlKey}: get: summary: Get Flow Control Key @@ -2426,6 +2446,27 @@ paths: application/json: schema: $ref: '#/components/schemas/Error' + + /v2/globalParallelism: + get: + summary: Get Global Parallelism + description: Returns the current global parallelism usage across all flow control keys + tags: + - Flow Control + responses: + '200': + description: Global parallelism info retrieved successfully + content: + application/json: + schema: + type: object + properties: + parallelismMax: + type: integer + description: The configured maximum global parallelism + parallelismCount: + type: integer + description: The current number of messages running globally in parallel /v2/keys: get: diff --git a/qstash/overall/changelog.mdx b/qstash/overall/changelog.mdx index a46031cb..c0c56fd2 100644 --- a/qstash/overall/changelog.mdx +++ b/qstash/overall/changelog.mdx @@ -6,6 +6,17 @@ title: Changelog We have moved the roadmap and the changelog to [Github Discussions](https://github.com/orgs/upstash/discussions) starting from October 2025.Now you can follow `In Progress` features. You can see that your `Feature Requests` are recorded. You can vote for them and comment your specific use-cases to shape the feature to your needs. + +- **TypeScript SDK (`qstash-js`)** and **Python SDK (`qstash-py`):** + - Added flow control management API: `list()`, `get(key)`, and `reset(key)` methods on `client.flowControl` (TypeScript) and `client.flow_control` (Python). + - The `get` and `list` responses now include rich metrics: `waitListSize`, `parallelismMax`, `parallelismCount`, `rateMax`, `rateCount`, `ratePeriod`, and `ratePeriodStart`. + - See the [Flow Control](/qstash/features/flowcontrol#management-api) docs for code examples. +- **QStash Server:** + - `GET /v2/flowControl` now supports an optional `search` query parameter to filter keys by name. + - New `POST /v2/flowControl/:flowControlKey/reset` endpoint resets parallelism and rate counters for a key. + - `GET /v2/globalParallelism` now returns `{ parallelismMax, parallelismCount }` instead of the old `{ waitListSize }` shape. + + - **TypeScript SDK (`qstash-js`):** - `Label` feature is added. This will enable our users to label their publishes so that diff --git a/qstash/sdks/py/examples/flow-control.mdx b/qstash/sdks/py/examples/flow-control.mdx new file mode 100644 index 00000000..aba5d9c4 --- /dev/null +++ b/qstash/sdks/py/examples/flow-control.mdx @@ -0,0 +1,33 @@ +--- +title: Flow Control +--- + +#### Get a single flow control key + +```python +from qstash import QStash + +client = QStash("") + +info = client.flow_control.get("USER_GIVEN_KEY") +print(info.key) +print(info.wait_list_size) +print(info.parallelism_max) +print(info.parallelism_count) +print(info.rate_max) +print(info.rate_count) +print(info.rate_period) +print(info.rate_period_start) +``` + +#### Get global parallelism + +```python +from qstash import QStash + +client = QStash("") + +info = client.flow_control.get_global_parallelism() +print(info.parallelism_max) +print(info.parallelism_count) +``` diff --git a/qstash/sdks/ts/examples/flow-control.mdx b/qstash/sdks/ts/examples/flow-control.mdx new file mode 100644 index 00000000..5a957c79 --- /dev/null +++ b/qstash/sdks/ts/examples/flow-control.mdx @@ -0,0 +1,33 @@ +--- +title: Flow Control +--- + +#### Get a single flow control key + +```typescript +import { Client } from "@upstash/qstash"; + +const client = new Client({ token: "" }); + +const info = await client.flowControl.get("USER_GIVEN_KEY"); +console.log(info.flowControlKey); +console.log(info.waitListSize); +console.log(info.parallelismMax); +console.log(info.parallelismCount); +console.log(info.rateMax); +console.log(info.rateCount); +console.log(info.ratePeriod); +console.log(info.ratePeriodStart); +``` + +#### Get global parallelism + +```typescript +import { Client } from "@upstash/qstash"; + +const client = new Client({ token: "" }); + +const info = await client.flowControl.getGlobalParallelism(); +console.log(info.parallelismMax); +console.log(info.parallelismCount); +``` diff --git a/workflow/features/flow-control/monitor.mdx b/workflow/features/flow-control/monitor.mdx index 1820de21..38145a44 100644 --- a/workflow/features/flow-control/monitor.mdx +++ b/workflow/features/flow-control/monitor.mdx @@ -8,7 +8,7 @@ You can monitor wait list size of your flow control key's from the console `Flow -Also you can get the same info using the REST API. +Also you can get the same info using the REST API. - [List All Flow Control Keys](/workflow/rest/flow-control/list). - [Single Flow Control Key](/workflow/rest/flow-control/get). - +- [Global Parallelism](/workflow/rest/flow-control/global-parallelism). diff --git a/workflow/rest/flow-control/get.mdx b/workflow/rest/flow-control/get.mdx index a9257bc9..6afa279a 100644 --- a/workflow/rest/flow-control/get.mdx +++ b/workflow/rest/flow-control/get.mdx @@ -14,15 +14,54 @@ authMethod: "bearer" ## Response - The key of of the flow control. + The key of the flow control. The number of messages in the wait list that waits for `parallelism`/`rate` set in the flow control. + + The configured maximum number of messages allowed to run concurrently, if parallelism is set. + + + + The current number of messages running in parallel. + + + + The configured maximum number of messages allowed per rate period, if rate limiting is set. + + + + The number of messages dispatched in the current rate period. + + + + The length of the rate period in seconds. + + + + Unix timestamp (seconds) when the current rate period started. + + ```sh curl -X GET https://qstash.upstash.io/v2/flowControl/YOUR_FLOW_CONTROL_KEY -H "Authorization: Bearer " ``` + + +```json +{ + "flowControlKey": "YOUR_FLOW_CONTROL_KEY", + "waitListSize": 5, + "parallelismMax": 10, + "parallelismCount": 3, + "rateMax": 100, + "rateCount": 42, + "ratePeriod": 60, + "ratePeriodStart": 1708000000 +} +``` + diff --git a/workflow/rest/flow-control/global-parallelism.mdx b/workflow/rest/flow-control/global-parallelism.mdx new file mode 100644 index 00000000..6ae2e0f5 --- /dev/null +++ b/workflow/rest/flow-control/global-parallelism.mdx @@ -0,0 +1,31 @@ +--- +title: "Get Global Parallelism" +description: "Get global parallelism usage" +api: "GET https://qstash.upstash.io/v2/globalParallelism" +authMethod: "bearer" +--- + +## Response + + + The configured maximum global parallelism. + + + + The current number of messages running globally in parallel. + + + +```sh +curl -X GET https://qstash.upstash.io/v2/globalParallelism -H "Authorization: Bearer " +``` + + + +```json +{ + "parallelismMax": 500, + "parallelismCount": 42 +} +``` +