Worker SDK makes it easy to write Conductor workers which are strongly typed with specific inputs and outputs.
Annotations for the worker methods:
@WorkerTask- When annotated, convert a method to a Conductor worker.@InputParam- Name of the input parameter to bind to from the task's input.@OutputParam- Name of the output key of the task's output.
Please note inputs and outputs to a task in Conductor are JSON documents.
Examples
Create a worker named task1 that gets Task as input and produces TaskResult as output.
@WorkerTask("task1")
public TaskResult task1(Task task) {
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
}Create a worker named task2 that takes the name as a String input and produces an output return "Hello, " + name
@WorkerTask("task2")
public @OutputParam("greetings") String task2(@InputParam("name") String name) {
return "Hello, " + name;
}Example Task Input/Output
Input:
{
"name": "conductor"
}Output:
{
"greetings": "Hello, conductor"
}A worker that takes complex java type as input and produces the complex output:
@WorkerTask("get_insurance_quote")
public InsuranceQuote getInsuranceQuote(GetInsuranceQuote quoteInput) {
InsuranceQuote quote = new InsuranceQuote();
//Implementation
return quote;
}Example Task Input/Output
Input:
{
"name": "personA",
"zipCode": "10121",
"amount": 1000000
}Output:
{
"name": "personA",
"quotedPremium": 123.50,
"quotedAmount": 1000000
}Annotated Workers are managed by WorkflowExecutor
To start workers via classpath scanning, use either of the initWorkers methods in the WorkflowExecutor class.
WorkflowExecutor executor = new WorkflowExecutor("http://server/api/");
// List of packages (comma separated) to scan for annotated workers.
// Please note, the worker method(s) MUST be public and the class in which they are defined
// MUST have a no-args constructor
executor.initWorkers("com.company.package1,com.company.package2");
// You may also specify packages as separate variadic arguments rather than comma-separated
executor.initWorkers("com.company.package1", "com.company.package2");To start workers via a specific class (or list of classes) use the initWorkersFromClasses method. This will not scan
the entire classpath, but only the classes you specify.
WorkflowExecutor executor = new WorkflowExecutor("http://server/api/");
// As above, note that the worker method(s) MUST be public and the class in which they are defined
// MUST have a no-args constructor
executor.initWorkersFromClasses(List.of(MyCoolWorker.class, SomeOtherWorker.class));Finally, if you want to manage your workers in a way that requires you to construct them without no-args constructors,
you may use the initWorkersFromInstances method. This allows you to pass in instances of your worker classes.
WorkflowExecutor executor = new WorkflowExecutor("http://server/api/");
// As above, note that the worker method(s) MUST be public
executor.initWorkersFromInstances(List.of(
myCoolWorkerInstance, someOtherWorkerInstance
))The code fragment to stop workers at shutdown of the application.
executor.shutdown();Workers implemented with the annotations are regular Java methods that can be unit tested with any testing framework.
Create a mock worker in a different package (e.g., test) and scan for these packages when loading up the workers for integration testing.
See Unit Testing Framework for more details on testing.
In a typical production environment, you will have multiple workers across different machines/VMs/pods polling for the same task. As with all Conductor workers, the following best practices apply:
- Workers should be stateless and should not maintain any state on the process they are running.
- Ideally, workers should be idempotent.
- The worker should follow the Single Responsibility Principle and do exactly one thing they are responsible for.
- The worker should not embed any workflow logic - i.e., scheduling another worker, sending a message, etc. The Conductor has features to do this, making it possible to decouple your workflow logic from worker implementation.