Skip to content

Commit 008988d

Browse files
halspangCopilot
andcommitted
feat: add Durable Task Scheduler (DTS) backend
Add a new backend/scheduler package that implements the Backend interface for connecting to the Durable Task Scheduler cloud service over gRPC. Key components: - SchedulerOptions: configuration with endpoint, task hub, auth type - Connection string parsing matching the .NET SDK format (Endpoint=...;TaskHub=...;Authentication=...) - gRPC channel with TLS auto-detection and metadata interceptors (taskhub, workerid headers injected on every request) - Stub Backend interface implementation (only used for connection) - Worker identity generation (hostname,pid,uuid) matching existing backends The DTS backend is a remote service, so work item dispatch is handled through the gRPC streaming protocol rather than local polling. Includes unit tests for connection string parsing and backend construction. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 812d373 commit 008988d

12 files changed

Lines changed: 979 additions & 53 deletions

File tree

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package durabletaskscheduler
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
)
7+
8+
const (
9+
// DefaultResourceID is the default Azure resource ID used for token authentication with DTS.
10+
DefaultResourceID = "https://durabletask.io"
11+
12+
// DefaultAuthType is the default authentication type.
13+
DefaultAuthType = AuthTypeDefaultAzure
14+
)
15+
16+
// AuthType represents the authentication type for connecting to DTS.
17+
type AuthType string
18+
19+
const (
20+
// AuthTypeNone disables authentication (for local development).
21+
AuthTypeNone AuthType = "none"
22+
23+
// AuthTypeDefaultAzure uses DefaultAzureCredential from the Azure Identity SDK.
24+
AuthTypeDefaultAzure AuthType = "defaultazure"
25+
)
26+
27+
// Options configures the connection to a Durable Task Scheduler (DTS) endpoint.
28+
type Options struct {
29+
// EndpointAddress is the DTS endpoint URL (e.g., "https://myscheduler.westus2.durabletask.io").
30+
EndpointAddress string
31+
32+
// TaskHubName is the name of the task hub resource.
33+
TaskHubName string
34+
35+
// AuthType specifies how to authenticate with the DTS service.
36+
AuthType AuthType
37+
38+
// ResourceID is the Azure resource ID used for token scoping. Defaults to "https://durabletask.io".
39+
ResourceID string
40+
}
41+
42+
// NewOptions creates a new Options with the given endpoint and task hub name.
43+
func NewOptions(endpointAddress, taskHubName string) *Options {
44+
return &Options{
45+
EndpointAddress: endpointAddress,
46+
TaskHubName: taskHubName,
47+
AuthType: DefaultAuthType,
48+
ResourceID: DefaultResourceID,
49+
}
50+
}
51+
52+
// NewOptionsFromConnectionString parses a connection string into Options.
53+
//
54+
// The connection string format is:
55+
//
56+
// Endpoint=https://{scheduler-name}.{region}.durabletask.io;TaskHub={taskhub-name};Authentication={auth-type}
57+
//
58+
// Required keys: Endpoint, TaskHub.
59+
// Optional keys: Authentication (defaults to "defaultazure").
60+
func NewOptionsFromConnectionString(connectionString string) (*Options, error) {
61+
opts := &Options{
62+
AuthType: DefaultAuthType,
63+
ResourceID: DefaultResourceID,
64+
}
65+
66+
parts := strings.Split(connectionString, ";")
67+
for _, part := range parts {
68+
part = strings.TrimSpace(part)
69+
if part == "" {
70+
continue
71+
}
72+
73+
kv := strings.SplitN(part, "=", 2)
74+
if len(kv) != 2 {
75+
return nil, fmt.Errorf("invalid connection string segment: %q", part)
76+
}
77+
78+
key := strings.TrimSpace(kv[0])
79+
value := strings.TrimSpace(kv[1])
80+
81+
switch strings.ToLower(key) {
82+
case "endpoint":
83+
opts.EndpointAddress = value
84+
case "taskhub":
85+
opts.TaskHubName = value
86+
case "authentication":
87+
opts.AuthType = AuthType(strings.ToLower(value))
88+
default:
89+
// Ignore unknown keys for forward compatibility
90+
}
91+
}
92+
93+
if opts.EndpointAddress == "" {
94+
return nil, fmt.Errorf("connection string is missing required 'Endpoint' key")
95+
}
96+
if opts.TaskHubName == "" {
97+
return nil, fmt.Errorf("connection string is missing required 'TaskHub' key")
98+
}
99+
100+
return opts, nil
101+
}

0 commit comments

Comments
 (0)