Skip to content

Latest commit

 

History

History
524 lines (434 loc) · 10.9 KB

File metadata and controls

524 lines (434 loc) · 10.9 KB

TaskManager API Reference

The TaskManager is responsible for initializing and managing the runners that poll and work different task queues.

Constructor

new TaskManager(client: Client, workers: Array<ConductorWorker>, config: TaskManagerConfig = {})

Creates a new TaskManager.

Parameters:

  • client (Client): An instance of Client.
  • workers (Array<ConductorWorker>): An array of ConductorWorker instances.
  • config (TaskManagerConfig, optional): Configuration for the TaskManager.

Example:

import { TaskManager } from "@io-orkes/conductor-javascript";

const workers = [
  {
    taskDefName: "email_task",
    execute: async (task) => {
      // Task execution logic
      return {
        status: "COMPLETED",
        outputData: { sent: true },
      };
    },
  },
];

const taskManager = new TaskManager(client, workers, {
  options: {
    concurrency: 5,
    pollInterval: 100,
  },
  maxRetries: 3,
});

Properties

isPolling: boolean

Returns whether the TaskManager is currently polling for tasks.


Methods

updatePollingOptionForWorker(workerTaskDefName: string, options: Partial<TaskManagerOptions>): void

Updates the polling options for a specific worker.

Parameters:

  • workerTaskDefName (string): The task definition name of the worker.
  • options (Partial<TaskManagerOptions>): The new polling options.

Example:

import { TaskManager } from "@io-orkes/conductor-javascript";

const taskManager = new TaskManager(client, workers);

// Update polling options for a specific worker
taskManager.updatePollingOptionForWorker("email_task", {
  concurrency: 10,
  pollInterval: 500,
});

updatePollingOptions(options: Partial<TaskManagerOptions>): void

Updates the polling options for all workers.

Parameters:

  • options (Partial<TaskManagerOptions>): The new polling options.

Example:

import { TaskManager } from "@io-orkes/conductor-javascript";

const taskManager = new TaskManager(client, workers);

// Update polling options for all workers
taskManager.updatePollingOptions({
  concurrency: 5,
  pollInterval: 200,
});

startPolling(): void

Starts polling for tasks for all workers.

Example:

import { TaskManager } from "@io-orkes/conductor-javascript";

const taskManager = new TaskManager(client, workers);

// Start polling for tasks
taskManager.startPolling();
console.log(`Polling started: ${taskManager.isPolling}`);

stopPolling(): Promise<void>

Stops polling for tasks for all workers.

Example:

import { TaskManager } from "@io-orkes/conductor-javascript";

const taskManager = new TaskManager(client, workers);

// Stop polling for tasks
await taskManager.stopPolling();
console.log(`Polling stopped: ${taskManager.isPolling}`);

sanityCheck(): void

Performs a sanity check on the workers, ensuring there are no duplicates and that at least one worker is present. Throws an error if the check fails.

Example:

import { TaskManager } from "@io-orkes/conductor-javascript";

const taskManager = new TaskManager(client, workers);

// Perform sanity check
try {
  taskManager.sanityCheck();
  console.log("All workers are valid");
} catch (error) {
  console.error("Worker configuration error:", error.message);
}

Type Definitions

TaskManagerConfig

export interface TaskManagerConfig {
  logger?: ConductorLogger;
  options?: Partial<TaskManagerOptions>;
  onError?: TaskErrorHandler;
  maxRetries?: number;
}

TaskManagerOptions

export type TaskManagerOptions = TaskRunnerOptions;

TaskRunnerOptions

export interface TaskRunnerOptions {
  workerID: string;
  domain: string | undefined;
  pollInterval?: number;
  concurrency?: number;
  batchPollingTimeout?: number;
}

ConductorWorker

export interface ConductorWorker {
  taskDefName: string;
  execute: (
    task: Task
  ) => Promise<Omit<TaskResult, "workflowInstanceId" | "taskId">>;
  domain?: string;
  concurrency?: number;
  pollInterval?: number;
}

TaskErrorHandler

export type TaskErrorHandler = (error: Error, task?: Task) => void;

ConductorLogger

export interface ConductorLogger {
  info(...args: unknown[]): void;
  error(...args: unknown[]): void;
  debug(...args: unknown[]): void;
}

DefaultLogger

export declare class DefaultLogger implements ConductorLogger {
  constructor(config?: DefaultLoggerConfig);

  info(...args: unknown[]): void;
  error(...args: unknown[]): void;
  debug(...args: unknown[]): void;
}

DefaultLoggerConfig

export interface DefaultLoggerConfig {
  level?: ConductorLogLevel;
  tags?: object[];
}

ConductorLogLevel

export type ConductorLogLevel = "DEBUG" | "INFO" | "ERROR";

Task

export type Task = {
  callbackAfterSeconds?: number;
  callbackFromWorker?: boolean;
  correlationId?: string;
  domain?: string;
  endTime?: number;
  executed?: boolean;
  executionNameSpace?: string;
  externalInputPayloadStoragePath?: string;
  externalOutputPayloadStoragePath?: string;
  firstStartTime?: number;
  inputData?: {
    [key: string]: unknown;
  };
  isolationGroupId?: string;
  iteration?: number;
  loopOverTask?: boolean;
  outputData?: {
    [key: string]: unknown;
  };
  parentTaskId?: string;
  pollCount?: number;
  queueWaitTime?: number;
  rateLimitFrequencyInSeconds?: number;
  rateLimitPerFrequency?: number;
  reasonForIncompletion?: string;
  referenceTaskName?: string;
  responseTimeoutSeconds?: number;
  retried?: boolean;
  retriedTaskId?: string;
  retryCount?: number;
  scheduledTime?: number;
  seq?: number;
  startDelayInSeconds?: number;
  startTime?: number;
  status?:
    | "IN_PROGRESS"
    | "CANCELED"
    | "FAILED"
    | "FAILED_WITH_TERMINAL_ERROR"
    | "COMPLETED"
    | "COMPLETED_WITH_ERRORS"
    | "SCHEDULED"
    | "TIMED_OUT"
    | "SKIPPED";
  subWorkflowId?: string;
  subworkflowChanged?: boolean;
  taskDefName?: string;
  taskDefinition?: TaskDef;
  taskId?: string;
  taskType?: string;
  updateTime?: number;
  workerId?: string;
  workflowInstanceId?: string;
  workflowPriority?: number;
  workflowTask?: WorkflowTask;
  workflowType?: string;
};

TaskResult

export type TaskResult = {
  callbackAfterSeconds?: number;
  extendLease?: boolean;
  externalOutputPayloadStoragePath?: string;
  logs?: Array<TaskExecLog>;
  outputData?: {
    [key: string]: unknown;
  };
  reasonForIncompletion?: string;
  status?:
    | "IN_PROGRESS"
    | "FAILED"
    | "FAILED_WITH_TERMINAL_ERROR"
    | "COMPLETED";
  subWorkflowId?: string;
  taskId: string;
  workerId?: string;
  workflowInstanceId: string;
};

TaskResultStatusEnum

export enum TaskResultStatusEnum {
  IN_PROGRESS = "IN_PROGRESS",
  FAILED = "FAILED",
  FAILED_WITH_TERMINAL_ERROR = "FAILED_WITH_TERMINAL_ERROR",
  COMPLETED = "COMPLETED",
}

RunnerArgs

export interface RunnerArgs {
  worker: ConductorWorker;
  client: Client;
  options: TaskRunnerOptions;
  logger?: ConductorLogger;
  onError?: TaskErrorHandler;
  concurrency?: number;
  maxRetries?: number;
}

TaskExecLog

export type TaskExecLog = {
  createdTime?: number;
  log?: string;
  taskId?: string;
};

TaskDef

export type TaskDef = {
  name?: string;
  description?: string;
  retryCount?: number;
  timeoutSeconds?: number;
  inputKeys?: Array<string>;
  outputKeys?: Array<string>;
  timeoutPolicy?: "RETRY" | "TIME_OUT_WF" | "ALERT_ONLY";
  retryLogic?: "FIXED" | "EXPONENTIAL_BACKOFF" | "LINEAR_BACKOFF";
  retryDelaySeconds?: number;
  responseTimeoutSeconds?: number;
  concurrentExecLimit?: number;
  inputTemplate?: {
    [key: string]: unknown;
  };
  rateLimitPerFrequency?: number;
  rateLimitFrequencyInSeconds?: number;
  isolationGroupId?: string;
  executionNameSpace?: string;
  ownerEmail?: string;
  pollTimeoutSeconds?: number;
  backoffScaleFactor?: number;
  createTime?: number;
  updateTime?: number;
  createdBy?: string;
  updatedBy?: string;
  accessPolicy?: {
    [key: string]: Array<string>;
  };
  workflowTaskType?: string;
  archivalConfig?: {
    enabled?: boolean;
    archiveAfterDays?: number;
  };
};

WorkflowTask

export type WorkflowTask = {
  name?: string;
  taskReferenceName?: string;
  description?: string;
  inputParameters?: {
    [key: string]: unknown;
  };
  type?: string;
  dynamicTaskNameParam?: string;
  caseValueParam?: string;
  caseValues?: Array<string>;
  dynamicForkJoinTasksParam?: string;
  dynamicForkTasksParam?: string;
  dynamicForkTasksInputParamName?: string;
  defaultCase?: Array<WorkflowTask>;
  forkTasks?: Array<Array<WorkflowTask>>;
  startDelay?: number;
  subWorkflowParam?: {
    name?: string;
    version?: number;
    taskToDomain?: {
      [key: string]: string;
    };
    workflowDefinition?: WorkflowDef;
  };
  joinOn?: Array<string>;
  sink?: string;
  optional?: boolean;
  taskDefinition?: TaskDef;
  rateLimited?: boolean;
  defaultExclusiveJoinTask?: Array<string>;
  asyncComplete?: boolean;
  loopCondition?: string;
  loopOver?: Array<WorkflowTask>;
  retryCount?: number;
  evaluatorType?: string;
  expression?: string;
  decisionCases?: {
    [key: string]: Array<WorkflowTask>;
  };
  scriptExpression?: string;
  eventTaskName?: string;
  eventTaskInput?: {
    [key: string]: unknown;
  };
  status?: string;
  retryLogic?: string;
  retryDelaySeconds?: number;
  timeoutSeconds?: number;
  timeoutPolicy?: string;
  responseTimeoutSeconds?: number;
  concurrentExecLimit?: number;
  rateLimitPerFrequency?: number;
  rateLimitFrequencyInSeconds?: number;
  isolationGroupId?: string;
  executionNameSpace?: string;
  ownerEmail?: string;
  onStateChange?: {
    [key: string]: unknown;
  };
  defaultRetryPolicy?: {
    initialIntervalSeconds?: number;
    backoffCoefficient?: number;
    maximumAttempts?: number;
    maximumIntervalSeconds?: number;
  };
};

WorkflowDef

export type WorkflowDef = {
  cacheConfig?: CacheConfig;
  createTime?: number;
  createdBy?: string;
  description?: string;
  enforceSchema?: boolean;
  failureWorkflow?: string;
  inputParameters?: Array<string>;
  inputSchema?: SchemaDef;
  inputTemplate?: {
    [key: string]: unknown;
  };
  maskedFields?: Array<string>;
  metadata?: {
    [key: string]: unknown;
  };
  name: string;
  outputParameters?: {
    [key: string]: unknown;
  };
  outputSchema?: SchemaDef;
  ownerApp?: string;
  ownerEmail?: string;
  rateLimitConfig?: RateLimitConfig;
  restartable?: boolean;
  schemaVersion?: number;
  tasks: Array<WorkflowTask>;
  timeoutPolicy?: "TIME_OUT_WF" | "ALERT_ONLY";
  timeoutSeconds: number;
  updateTime?: number;
  updatedBy?: string;
  variables?: {
    [key: string]: unknown;
  };
  version?: number;
  workflowStatusListenerEnabled?: boolean;
  workflowStatusListenerSink?: string;
};