diff --git a/lib/handler-scanner.service.ts b/lib/handler-scanner.service.ts index e33fef6..20c03f2 100644 --- a/lib/handler-scanner.service.ts +++ b/lib/handler-scanner.service.ts @@ -10,6 +10,7 @@ import { import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper"; import PgBoss from "pg-boss"; import { LOGGER } from "./utils/consts"; +import { WorkOptions } from "./interfaces/handler-metadata.interface"; @Injectable() export class HandlerScannerService { @@ -45,7 +46,7 @@ export class HandlerScannerService { const methodRef = instance[methodName]; const jobName = this.reflector.get(JOB_NAME, methodRef); - const jobOptions = this.reflector.get( + const jobOptions = this.reflector.get( JOB_OPTIONS, methodRef, ); @@ -72,12 +73,15 @@ export class HandlerScannerService { continue; } - await this.pgBossService.registerJob( - jobName, - methodRef.bind(instance), - jobOptions, - ); - this.logger.log(`Registered job: ${jobName}`); + const teamSize = Math.max(1, jobOptions?.teamSize ?? 1); + for (let i = 0; i < teamSize; i++) { + await this.pgBossService.registerJob( + jobName, + methodRef.bind(instance), + jobOptions, + ); + } + this.logger.log(`Registered job: ${jobName} (${teamSize} worker${teamSize > 1 ? 's' : ''})`); } catch (error) { this.logger.error(error, `Error registering job ${jobName}`); } diff --git a/lib/interfaces/handler-metadata.interface.ts b/lib/interfaces/handler-metadata.interface.ts index 447be47..bdb5fe1 100644 --- a/lib/interfaces/handler-metadata.interface.ts +++ b/lib/interfaces/handler-metadata.interface.ts @@ -1,4 +1,8 @@ -import { WorkOptions, ScheduleOptions } from "pg-boss"; +import { WorkOptions as PgBossWorkOptions, ScheduleOptions } from "pg-boss"; + +export interface WorkOptions extends PgBossWorkOptions { + teamSize?: number; +} export interface HandlerMetadata { jobName: string; diff --git a/test/handler-scanner.service.spec.ts b/test/handler-scanner.service.spec.ts new file mode 100644 index 0000000..7a6a7de --- /dev/null +++ b/test/handler-scanner.service.spec.ts @@ -0,0 +1,81 @@ +jest.mock("pg-boss", () => ({})); + +import { Test, TestingModule } from "@nestjs/testing"; +import { Reflector, ModulesContainer } from "@nestjs/core"; +import { HandlerScannerService } from "../lib/handler-scanner.service"; +import { PgBossService } from "../lib/pgboss.service"; +import { JOB_NAME, JOB_OPTIONS } from "../lib/decorators/job.decorator"; + +describe("HandlerScannerService", () => { + let service: HandlerScannerService; + let mockPgBossService: any; + let mockReflector: any; + let mockModulesContainer: Map; + + class TestHandler { + handle() {} + } + + beforeEach(async () => { + mockPgBossService = { + registerJob: jest.fn().mockResolvedValue(undefined), + registerCronJob: jest.fn().mockResolvedValue(undefined), + }; + + mockReflector = { get: jest.fn() }; + mockModulesContainer = new Map(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + HandlerScannerService, + { provide: PgBossService, useValue: mockPgBossService }, + { provide: Reflector, useValue: mockReflector }, + { provide: ModulesContainer, useValue: mockModulesContainer }, + ], + }).compile(); + + service = module.get(HandlerScannerService); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe("teamSize", () => { + const setupHandler = (jobOptions?: { teamSize?: number }) => { + const instance = new TestHandler(); + mockModulesContainer.set("TestModule", { + providers: new Map([["TestHandler", { instance }]]), + }); + mockReflector.get.mockImplementation((key: string, target: any) => { + if (key === JOB_NAME && target === instance.handle) return "my-job"; + if (key === JOB_OPTIONS && target === instance.handle) return jobOptions; + return undefined; + }); + }; + + it("should register job once by default", async () => { + setupHandler(); + await service.scanAndRegisterHandlers(); + expect(mockPgBossService.registerJob).toHaveBeenCalledTimes(1); + }); + + it("should register job multiple times when teamSize > 1", async () => { + setupHandler({ teamSize: 3 }); + await service.scanAndRegisterHandlers(); + expect(mockPgBossService.registerJob).toHaveBeenCalledTimes(3); + }); + + it("should default to 1 when teamSize is 0", async () => { + setupHandler({ teamSize: 0 }); + await service.scanAndRegisterHandlers(); + expect(mockPgBossService.registerJob).toHaveBeenCalledTimes(1); + }); + + it("should default to 1 when teamSize is negative", async () => { + setupHandler({ teamSize: -5 }); + await service.scanAndRegisterHandlers(); + expect(mockPgBossService.registerJob).toHaveBeenCalledTimes(1); + }); + }); +});