Skip to content

Commit ad7450d

Browse files
committed
initial commit
1 parent 6fbb654 commit ad7450d

File tree

6 files changed

+687
-4
lines changed

6 files changed

+687
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
## Unreleased
2+
* Add work item filtering support for `DurableTaskGrpcWorker` to enable worker-side filtering of orchestration and activity work items
23
* Add support for calls to HTTP endpoints ([#271](https://github.com/microsoft/durabletask-java/pull/271))
34
* Add getSuspendPostUri and getResumePostUri getters to HttpManagementPayload ([#264](https://github.com/microsoft/durabletask-java/pull/264))
45

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3939
private final DataConverter dataConverter;
4040
private final Duration maximumTimerInterval;
4141
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
42+
private final WorkItemFilter workItemFilter;
4243

4344
private final TaskHubSidecarServiceBlockingStub sidecarClient;
4445

45-
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
46+
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder, WorkItemFilter workItemFilter) {
4647
this.orchestrationFactories.putAll(builder.orchestrationFactories);
4748
this.activityFactories.putAll(builder.activityFactories);
4849

@@ -70,6 +71,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
7071
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
7172
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
7273
this.versioningOptions = builder.versioningOptions;
74+
this.workItemFilter = workItemFilter;
7375
}
7476

7577
/**
@@ -132,7 +134,7 @@ public void startAndBlock() {
132134
// TODO: How do we interrupt manually?
133135
while (true) {
134136
try {
135-
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
137+
GetWorkItemsRequest getWorkItemsRequest = buildGetWorkItemsRequest();
136138
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
137139
while (workItemStream.hasNext()) {
138140
WorkItem workItem = workItemStream.next();
@@ -408,4 +410,38 @@ else if (requestType == RequestCase.HEALTHPING)
408410
public void stop() {
409411
this.close();
410412
}
411-
}
413+
414+
/**
415+
* Returns the work item filter configured for this worker, or {@code null} if none.
416+
*/
417+
WorkItemFilter getWorkItemFilter() {
418+
return this.workItemFilter;
419+
}
420+
421+
private GetWorkItemsRequest buildGetWorkItemsRequest() {
422+
GetWorkItemsRequest.Builder builder = GetWorkItemsRequest.newBuilder();
423+
if (this.workItemFilter != null) {
424+
builder.setWorkItemFilters(toProtoWorkItemFilters(this.workItemFilter));
425+
}
426+
return builder.build();
427+
}
428+
429+
static WorkItemFilters toProtoWorkItemFilters(WorkItemFilter filter) {
430+
WorkItemFilters.Builder builder = WorkItemFilters.newBuilder();
431+
for (WorkItemFilter.OrchestrationFilter orch : filter.getOrchestrations()) {
432+
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.OrchestrationFilter.Builder orchBuilder =
433+
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.OrchestrationFilter.newBuilder()
434+
.setName(orch.getName());
435+
orchBuilder.addAllVersions(orch.getVersions());
436+
builder.addOrchestrations(orchBuilder.build());
437+
}
438+
for (WorkItemFilter.ActivityFilter activity : filter.getActivities()) {
439+
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.ActivityFilter.Builder actBuilder =
440+
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.ActivityFilter.newBuilder()
441+
.setName(activity.getName());
442+
actBuilder.addAllVersions(activity.getVersions());
443+
builder.addActivities(actBuilder.build());
444+
}
445+
return builder.build();
446+
}
447+
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import io.grpc.Channel;
66

77
import java.time.Duration;
8+
import java.util.Collections;
89
import java.util.HashMap;
10+
import java.util.List;
911

1012
/**
1113
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.
@@ -18,6 +20,8 @@ public final class DurableTaskGrpcWorkerBuilder {
1820
DataConverter dataConverter;
1921
Duration maximumTimerInterval;
2022
DurableTaskGrpcWorkerVersioningOptions versioningOptions;
23+
private WorkItemFilter workItemFilter;
24+
private boolean autoGenerateWorkItemFilters;
2125

2226
/**
2327
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
@@ -125,11 +129,71 @@ public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersionin
125129
return this;
126130
}
127131

132+
/**
133+
* Sets explicit work item filters for this worker. When set, only work items matching the filters
134+
* will be dispatched to this worker by the backend.
135+
* <p>
136+
* Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
137+
* only receives work items it can handle. However, if an orchestration calls a task type
138+
* (e.g., an activity or sub-orchestrator) that is not registered with any connected worker,
139+
* the call may hang indefinitely instead of failing with an error.
140+
*
141+
* @param workItemFilter the work item filter to use, or {@code null} to disable filtering
142+
* @return this builder object
143+
*/
144+
public DurableTaskGrpcWorkerBuilder useWorkItemFilters(WorkItemFilter workItemFilter) {
145+
this.workItemFilter = workItemFilter;
146+
this.autoGenerateWorkItemFilters = false;
147+
return this;
148+
}
149+
150+
/**
151+
* Enables automatic work item filtering by generating filters from the registered
152+
* orchestrations and activities. When enabled, the backend will only dispatch work items
153+
* for registered orchestrations and activities to this worker.
154+
* <p>
155+
* Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
156+
* only receives work items it can handle. However, if an orchestration calls a task type
157+
* (e.g., an activity or sub-orchestrator) that is not registered with any connected worker,
158+
* the call may hang indefinitely instead of failing with an error.
159+
* <p>
160+
* Only use this method when all task types referenced by orchestrations are guaranteed to be
161+
* registered with at least one connected worker.
162+
*
163+
* @return this builder object
164+
*/
165+
public DurableTaskGrpcWorkerBuilder useWorkItemFilters() {
166+
this.autoGenerateWorkItemFilters = true;
167+
this.workItemFilter = null;
168+
return this;
169+
}
170+
128171
/**
129172
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
130173
* @return a new {@link DurableTaskGrpcWorker} object
131174
*/
132175
public DurableTaskGrpcWorker build() {
133-
return new DurableTaskGrpcWorker(this);
176+
WorkItemFilter resolvedFilter = this.autoGenerateWorkItemFilters
177+
? buildAutoWorkItemFilter()
178+
: this.workItemFilter;
179+
return new DurableTaskGrpcWorker(this, resolvedFilter);
180+
}
181+
182+
private WorkItemFilter buildAutoWorkItemFilter() {
183+
List<String> versions = Collections.emptyList();
184+
if (this.versioningOptions != null
185+
&& this.versioningOptions.getMatchStrategy() == DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.STRICT
186+
&& this.versioningOptions.getVersion() != null) {
187+
versions = Collections.singletonList(this.versioningOptions.getVersion());
188+
}
189+
190+
WorkItemFilter.Builder builder = WorkItemFilter.newBuilder();
191+
for (String name : this.orchestrationFactories.keySet()) {
192+
builder.addOrchestration(name, versions);
193+
}
194+
for (String name : this.activityFactories.keySet()) {
195+
builder.addActivity(name, versions);
196+
}
197+
return builder.build();
134198
}
135199
}
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask;
4+
5+
import java.util.ArrayList;
6+
import java.util.Collections;
7+
import java.util.List;
8+
9+
/**
10+
* Represents work item filters for a Durable Task worker. These filters are passed to the backend
11+
* and only work items matching the filters will be processed by the worker. If no filters are provided,
12+
* the worker will process all work items.
13+
* <p>
14+
* Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
15+
* only receives work items it can handle. However, if an orchestration calls a task type
16+
* (e.g., an activity or sub-orchestrator) that is not registered with any connected worker,
17+
* the call may hang indefinitely instead of failing with an error.
18+
* <p>
19+
* Use {@link DurableTaskGrpcWorkerBuilder#useWorkItemFilters(WorkItemFilter)} to provide explicit filters,
20+
* or {@link DurableTaskGrpcWorkerBuilder#useWorkItemFilters()} to auto-generate filters from the
21+
* registered orchestrations and activities.
22+
*/
23+
public final class WorkItemFilter {
24+
25+
private final List<OrchestrationFilter> orchestrations;
26+
private final List<ActivityFilter> activities;
27+
28+
private WorkItemFilter(List<OrchestrationFilter> orchestrations, List<ActivityFilter> activities) {
29+
this.orchestrations = Collections.unmodifiableList(new ArrayList<OrchestrationFilter>(orchestrations));
30+
this.activities = Collections.unmodifiableList(new ArrayList<ActivityFilter>(activities));
31+
}
32+
33+
/**
34+
* Gets the orchestration filters.
35+
*
36+
* @return an unmodifiable list of orchestration filters
37+
*/
38+
public List<OrchestrationFilter> getOrchestrations() {
39+
return this.orchestrations;
40+
}
41+
42+
/**
43+
* Gets the activity filters.
44+
*
45+
* @return an unmodifiable list of activity filters
46+
*/
47+
public List<ActivityFilter> getActivities() {
48+
return this.activities;
49+
}
50+
51+
/**
52+
* Creates a new {@link Builder} for constructing {@link WorkItemFilter} instances.
53+
*
54+
* @return a new builder
55+
*/
56+
public static Builder newBuilder() {
57+
return new Builder();
58+
}
59+
60+
/**
61+
* Builder for constructing {@link WorkItemFilter} instances.
62+
*/
63+
public static final class Builder {
64+
private final List<OrchestrationFilter> orchestrations = new ArrayList<OrchestrationFilter>();
65+
private final List<ActivityFilter> activities = new ArrayList<ActivityFilter>();
66+
67+
Builder() {
68+
}
69+
70+
/**
71+
* Adds an orchestration filter with the specified name and no version constraint.
72+
*
73+
* @param name the orchestration name to filter on
74+
* @return this builder
75+
*/
76+
public Builder addOrchestration(String name) {
77+
if (name == null || name.isEmpty()) {
78+
throw new IllegalArgumentException("Orchestration filter name must not be null or empty.");
79+
}
80+
this.orchestrations.add(new OrchestrationFilter(name, Collections.<String>emptyList()));
81+
return this;
82+
}
83+
84+
/**
85+
* Adds an orchestration filter with the specified name and versions.
86+
*
87+
* @param name the orchestration name to filter on
88+
* @param versions the versions to filter on
89+
* @return this builder
90+
*/
91+
public Builder addOrchestration(String name, List<String> versions) {
92+
if (name == null || name.isEmpty()) {
93+
throw new IllegalArgumentException("Orchestration filter name must not be null or empty.");
94+
}
95+
List<String> versionsCopy = versions != null
96+
? Collections.unmodifiableList(new ArrayList<String>(versions))
97+
: Collections.<String>emptyList();
98+
this.orchestrations.add(new OrchestrationFilter(name, versionsCopy));
99+
return this;
100+
}
101+
102+
/**
103+
* Adds an activity filter with the specified name and no version constraint.
104+
*
105+
* @param name the activity name to filter on
106+
* @return this builder
107+
*/
108+
public Builder addActivity(String name) {
109+
if (name == null || name.isEmpty()) {
110+
throw new IllegalArgumentException("Activity filter name must not be null or empty.");
111+
}
112+
this.activities.add(new ActivityFilter(name, Collections.<String>emptyList()));
113+
return this;
114+
}
115+
116+
/**
117+
* Adds an activity filter with the specified name and versions.
118+
*
119+
* @param name the activity name to filter on
120+
* @param versions the versions to filter on
121+
* @return this builder
122+
*/
123+
public Builder addActivity(String name, List<String> versions) {
124+
if (name == null || name.isEmpty()) {
125+
throw new IllegalArgumentException("Activity filter name must not be null or empty.");
126+
}
127+
List<String> versionsCopy = versions != null
128+
? Collections.unmodifiableList(new ArrayList<String>(versions))
129+
: Collections.<String>emptyList();
130+
this.activities.add(new ActivityFilter(name, versionsCopy));
131+
return this;
132+
}
133+
134+
/**
135+
* Builds a new {@link WorkItemFilter} from the configured filters.
136+
*
137+
* @return a new {@link WorkItemFilter} instance
138+
*/
139+
public WorkItemFilter build() {
140+
return new WorkItemFilter(this.orchestrations, this.activities);
141+
}
142+
}
143+
144+
/**
145+
* Specifies an orchestration filter with a name and optional versions.
146+
*/
147+
public static final class OrchestrationFilter {
148+
private final String name;
149+
private final List<String> versions;
150+
151+
OrchestrationFilter(String name, List<String> versions) {
152+
this.name = name;
153+
this.versions = versions;
154+
}
155+
156+
/**
157+
* Gets the name of the orchestration to filter.
158+
*
159+
* @return the orchestration name
160+
*/
161+
public String getName() {
162+
return this.name;
163+
}
164+
165+
/**
166+
* Gets the versions of the orchestration to filter.
167+
*
168+
* @return an unmodifiable list of versions, or an empty list if no version constraint
169+
*/
170+
public List<String> getVersions() {
171+
return this.versions;
172+
}
173+
}
174+
175+
/**
176+
* Specifies an activity filter with a name and optional versions.
177+
*/
178+
public static final class ActivityFilter {
179+
private final String name;
180+
private final List<String> versions;
181+
182+
ActivityFilter(String name, List<String> versions) {
183+
this.name = name;
184+
this.versions = versions;
185+
}
186+
187+
/**
188+
* Gets the name of the activity to filter.
189+
*
190+
* @return the activity name
191+
*/
192+
public String getName() {
193+
return this.name;
194+
}
195+
196+
/**
197+
* Gets the versions of the activity to filter.
198+
*
199+
* @return an unmodifiable list of versions, or an empty list if no version constraint
200+
*/
201+
public List<String> getVersions() {
202+
return this.versions;
203+
}
204+
}
205+
}

0 commit comments

Comments
 (0)