From 76438b4b3cd4a81577e8cd3f5bf22600a689d123 Mon Sep 17 00:00:00 2001 From: sleepyqadir Date: Thu, 7 Mar 2024 20:54:03 +0500 Subject: [PATCH 1/2] implemented rabbitmq client and module --- dist/pubsub/index.d.ts | 1 + dist/pubsub/index.js | 1 + dist/pubsub/index.js.map | 2 +- dist/pubsub/rabbitmq/index.d.ts | 3 + dist/pubsub/rabbitmq/index.js | 20 ++ dist/pubsub/rabbitmq/index.js.map | 1 + dist/pubsub/rabbitmq/rabbitmq.client.d.ts | 8 + dist/pubsub/rabbitmq/rabbitmq.client.js | 64 ++++++ dist/pubsub/rabbitmq/rabbitmq.client.js.map | 1 + dist/pubsub/rabbitmq/rabbitmq.module.d.ts | 2 + dist/pubsub/rabbitmq/rabbitmq.module.js | 35 ++++ dist/pubsub/rabbitmq/rabbitmq.module.js.map | 1 + dist/pubsub/rabbitmq/types/exchangeMap.d.ts | 5 + dist/pubsub/rabbitmq/types/exchangeMap.js | 10 + dist/pubsub/rabbitmq/types/exchangeMap.js.map | 1 + package-lock.json | 185 ++++++++++++++++++ package.json | 2 + src/pubsub/index.ts | 1 + src/pubsub/rabbitmq/index.ts | 3 + src/pubsub/rabbitmq/rabbitmq.client.ts | 71 +++++++ src/pubsub/rabbitmq/rabbitmq.module.ts | 22 +++ src/pubsub/rabbitmq/types/exchangeMap.ts | 7 + 22 files changed, 445 insertions(+), 1 deletion(-) create mode 100644 dist/pubsub/rabbitmq/index.d.ts create mode 100644 dist/pubsub/rabbitmq/index.js create mode 100644 dist/pubsub/rabbitmq/index.js.map create mode 100644 dist/pubsub/rabbitmq/rabbitmq.client.d.ts create mode 100644 dist/pubsub/rabbitmq/rabbitmq.client.js create mode 100644 dist/pubsub/rabbitmq/rabbitmq.client.js.map create mode 100644 dist/pubsub/rabbitmq/rabbitmq.module.d.ts create mode 100644 dist/pubsub/rabbitmq/rabbitmq.module.js create mode 100644 dist/pubsub/rabbitmq/rabbitmq.module.js.map create mode 100644 dist/pubsub/rabbitmq/types/exchangeMap.d.ts create mode 100644 dist/pubsub/rabbitmq/types/exchangeMap.js create mode 100644 dist/pubsub/rabbitmq/types/exchangeMap.js.map create mode 100644 src/pubsub/rabbitmq/index.ts create mode 100644 src/pubsub/rabbitmq/rabbitmq.client.ts create mode 100644 src/pubsub/rabbitmq/rabbitmq.module.ts create mode 100644 src/pubsub/rabbitmq/types/exchangeMap.ts diff --git a/dist/pubsub/index.d.ts b/dist/pubsub/index.d.ts index d860d067..e75d6dc9 100644 --- a/dist/pubsub/index.d.ts +++ b/dist/pubsub/index.d.ts @@ -1,2 +1,3 @@ export * from './client'; export * from './types'; +export * from './rabbitmq'; diff --git a/dist/pubsub/index.js b/dist/pubsub/index.js index d96fcc33..4cd433f5 100644 --- a/dist/pubsub/index.js +++ b/dist/pubsub/index.js @@ -16,4 +16,5 @@ var __exportStar = (this && this.__exportStar) || function(m, exports) { Object.defineProperty(exports, "__esModule", { value: true }); __exportStar(require("./client"), exports); __exportStar(require("./types"), exports); +__exportStar(require("./rabbitmq"), exports); //# sourceMappingURL=index.js.map \ No newline at end of file diff --git a/dist/pubsub/index.js.map b/dist/pubsub/index.js.map index f11eec46..8d20abdf 100644 --- a/dist/pubsub/index.js.map +++ b/dist/pubsub/index.js.map @@ -1 +1 @@ -{"version":3,"file":"index.js","sourceRoot":"","sources":["../../src/pubsub/index.ts"],"names":[],"mappings":";;;;;;;;;;;;;;;;AAAA,2CAAyB;AACzB,0CAAwB"} \ No newline at end of file +{"version":3,"file":"index.js","sourceRoot":"","sources":["../../src/pubsub/index.ts"],"names":[],"mappings":";;;;;;;;;;;;;;;;AAAA,2CAAyB;AACzB,0CAAwB;AACxB,6CAA2B"} \ No newline at end of file diff --git a/dist/pubsub/rabbitmq/index.d.ts b/dist/pubsub/rabbitmq/index.d.ts new file mode 100644 index 00000000..cf1ad8f7 --- /dev/null +++ b/dist/pubsub/rabbitmq/index.d.ts @@ -0,0 +1,3 @@ +export * from './rabbitmq.client'; +export * from './rabbitmq.module'; +export * from './types/exchangeMap'; diff --git a/dist/pubsub/rabbitmq/index.js b/dist/pubsub/rabbitmq/index.js new file mode 100644 index 00000000..e31ddd78 --- /dev/null +++ b/dist/pubsub/rabbitmq/index.js @@ -0,0 +1,20 @@ +"use strict"; +var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + var desc = Object.getOwnPropertyDescriptor(m, k); + if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { + desc = { enumerable: true, get: function() { return m[k]; } }; + } + Object.defineProperty(o, k2, desc); +}) : (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + o[k2] = m[k]; +})); +var __exportStar = (this && this.__exportStar) || function(m, exports) { + for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p); +}; +Object.defineProperty(exports, "__esModule", { value: true }); +__exportStar(require("./rabbitmq.client"), exports); +__exportStar(require("./rabbitmq.module"), exports); +__exportStar(require("./types/exchangeMap"), exports); +//# sourceMappingURL=index.js.map \ No newline at end of file diff --git a/dist/pubsub/rabbitmq/index.js.map b/dist/pubsub/rabbitmq/index.js.map new file mode 100644 index 00000000..95cf0865 --- /dev/null +++ b/dist/pubsub/rabbitmq/index.js.map @@ -0,0 +1 @@ +{"version":3,"file":"index.js","sourceRoot":"","sources":["../../../src/pubsub/rabbitmq/index.ts"],"names":[],"mappings":";;;;;;;;;;;;;;;;AAAA,oDAAkC;AAClC,oDAAkC;AAClC,sDAAoC"} \ No newline at end of file diff --git a/dist/pubsub/rabbitmq/rabbitmq.client.d.ts b/dist/pubsub/rabbitmq/rabbitmq.client.d.ts new file mode 100644 index 00000000..115b81a1 --- /dev/null +++ b/dist/pubsub/rabbitmq/rabbitmq.client.d.ts @@ -0,0 +1,8 @@ +import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; +import { ArenaPubSubMessage, ArenaPubSubTrigger, PubSubMessage, PubSubTopic, PubSubTrigger } from '../types'; +export declare class RabbitMQClient { + private readonly amqpConnection; + constructor(amqpConnection: AmqpConnection); + publish(topic: T, routingKey: string, message: PubSubMessage> | ArenaPubSubMessage>): Promise; + subscribe(topic: T, routingKey: string, queueName: string, onMessage: (msg: PubSubMessage> | ArenaPubSubMessage>) => void): Promise; +} diff --git a/dist/pubsub/rabbitmq/rabbitmq.client.js b/dist/pubsub/rabbitmq/rabbitmq.client.js new file mode 100644 index 00000000..3e0fd20f --- /dev/null +++ b/dist/pubsub/rabbitmq/rabbitmq.client.js @@ -0,0 +1,64 @@ +"use strict"; +var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) { + var c = arguments.length, r = c < 3 ? target : desc === null ? desc = Object.getOwnPropertyDescriptor(target, key) : desc, d; + if (typeof Reflect === "object" && typeof Reflect.decorate === "function") r = Reflect.decorate(decorators, target, key, desc); + else for (var i = decorators.length - 1; i >= 0; i--) if (d = decorators[i]) r = (c < 3 ? d(r) : c > 3 ? d(target, key, r) : d(target, key)) || r; + return c > 3 && r && Object.defineProperty(target, key, r), r; +}; +var __metadata = (this && this.__metadata) || function (k, v) { + if (typeof Reflect === "object" && typeof Reflect.metadata === "function") return Reflect.metadata(k, v); +}; +var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { + function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } + return new (P || (P = Promise))(function (resolve, reject) { + function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } + function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } + function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } + step((generator = generator.apply(thisArg, _arguments || [])).next()); + }); +}; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.RabbitMQClient = void 0; +// rabbitmq-client.ts +const common_1 = require("@nestjs/common"); +const nestjs_rabbitmq_1 = require("@golevelup/nestjs-rabbitmq"); +const exchangeMap_1 = require("./types/exchangeMap"); +let RabbitMQClient = class RabbitMQClient { + constructor(amqpConnection) { + this.amqpConnection = amqpConnection; + } + publish(topic, routingKey, message) { + return __awaiter(this, void 0, void 0, function* () { + const exchange = exchangeMap_1.exchangeMap[topic]; + yield this.amqpConnection.channel.assertExchange(exchange, 'topic', { + durable: true, + }); + this.amqpConnection.channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(message))); + console.log(`Published message to ${exchange}:${routingKey}`); + }); + } + subscribe(topic, routingKey, queueName, onMessage) { + return __awaiter(this, void 0, void 0, function* () { + const exchange = exchangeMap_1.exchangeMap[topic]; + yield this.amqpConnection.channel.assertExchange(exchange, 'topic', { + durable: true, + }); + yield this.amqpConnection.channel.assertQueue(queueName, { durable: true }); + yield this.amqpConnection.channel.bindQueue(queueName, exchange, routingKey); + this.amqpConnection.channel.consume(queueName, (msg) => { + if (msg) { + const message = JSON.parse(msg.content.toString()); + onMessage(message); + this.amqpConnection.channel.ack(msg); + } + }, { noAck: false }); + console.log(`Subscribed to ${exchange}:${routingKey} with queue ${queueName}`); + }); + } +}; +RabbitMQClient = __decorate([ + (0, common_1.Injectable)(), + __metadata("design:paramtypes", [nestjs_rabbitmq_1.AmqpConnection]) +], RabbitMQClient); +exports.RabbitMQClient = RabbitMQClient; +//# sourceMappingURL=rabbitmq.client.js.map \ No newline at end of file diff --git a/dist/pubsub/rabbitmq/rabbitmq.client.js.map b/dist/pubsub/rabbitmq/rabbitmq.client.js.map new file mode 100644 index 00000000..2b8c941f --- /dev/null +++ b/dist/pubsub/rabbitmq/rabbitmq.client.js.map @@ -0,0 +1 @@ +{"version":3,"file":"rabbitmq.client.js","sourceRoot":"","sources":["../../../src/pubsub/rabbitmq/rabbitmq.client.ts"],"names":[],"mappings":";;;;;;;;;;;;;;;;;;;;;AAAA,qBAAqB;AACrB,2CAA4C;AAC5C,gEAA4D;AAQ5D,qDAAkD;AAG3C,IAAM,cAAc,GAApB,MAAM,cAAc;IACzB,YAA6B,cAA8B;QAA9B,mBAAc,GAAd,cAAc,CAAgB;IAAG,CAAC;IAEzD,OAAO,CACX,KAAQ,EACR,UAAkB,EAClB,OAE6C;;YAE7C,MAAM,QAAQ,GAAG,yBAAW,CAAC,KAAK,CAAC,CAAC;YACpC,MAAM,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,cAAc,CAAC,QAAQ,EAAE,OAAO,EAAE;gBAClE,OAAO,EAAE,IAAI;aACd,CAAC,CAAC;YACH,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,OAAO,CACjC,QAAQ,EACR,UAAU,EACV,MAAM,CAAC,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,OAAO,CAAC,CAAC,CACrC,CAAC;YACF,OAAO,CAAC,GAAG,CAAC,wBAAwB,QAAQ,IAAI,UAAU,EAAE,CAAC,CAAC;QAChE,CAAC;KAAA;IAEK,SAAS,CACb,KAAQ,EACR,UAAkB,EAClB,SAAiB,EACjB,SAIS;;YAET,MAAM,QAAQ,GAAG,yBAAW,CAAC,KAAK,CAAC,CAAC;YACpC,MAAM,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,cAAc,CAAC,QAAQ,EAAE,OAAO,EAAE;gBAClE,OAAO,EAAE,IAAI;aACd,CAAC,CAAC;YACH,MAAM,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,WAAW,CAAC,SAAS,EAAE,EAAE,OAAO,EAAE,IAAI,EAAE,CAAC,CAAC;YAC5E,MAAM,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,SAAS,CACzC,SAAS,EACT,QAAQ,EACR,UAAU,CACX,CAAC;YACF,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,OAAO,CACjC,SAAS,EACT,CAAC,GAAG,EAAE,EAAE;gBACN,IAAI,GAAG,EAAE;oBACP,MAAM,OAAO,GAAG,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,OAAO,CAAC,QAAQ,EAAE,CAAC,CAAC;oBACnD,SAAS,CAAC,OAAO,CAAC,CAAC;oBACnB,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,GAAG,CAAC,GAAG,CAAC,CAAC;iBACtC;YACH,CAAC,EACD,EAAE,KAAK,EAAE,KAAK,EAAE,CACjB,CAAC;YACF,OAAO,CAAC,GAAG,CACT,iBAAiB,QAAQ,IAAI,UAAU,eAAe,SAAS,EAAE,CAClE,CAAC;QACJ,CAAC;KAAA;CACF,CAAA;AAzDY,cAAc;IAD1B,IAAA,mBAAU,GAAE;qCAEkC,gCAAc;GADhD,cAAc,CAyD1B;AAzDY,wCAAc"} \ No newline at end of file diff --git a/dist/pubsub/rabbitmq/rabbitmq.module.d.ts b/dist/pubsub/rabbitmq/rabbitmq.module.d.ts new file mode 100644 index 00000000..4bbf2614 --- /dev/null +++ b/dist/pubsub/rabbitmq/rabbitmq.module.d.ts @@ -0,0 +1,2 @@ +export declare class RabbitMQCustomModule { +} diff --git a/dist/pubsub/rabbitmq/rabbitmq.module.js b/dist/pubsub/rabbitmq/rabbitmq.module.js new file mode 100644 index 00000000..4ce5bbce --- /dev/null +++ b/dist/pubsub/rabbitmq/rabbitmq.module.js @@ -0,0 +1,35 @@ +"use strict"; +var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) { + var c = arguments.length, r = c < 3 ? target : desc === null ? desc = Object.getOwnPropertyDescriptor(target, key) : desc, d; + if (typeof Reflect === "object" && typeof Reflect.decorate === "function") r = Reflect.decorate(decorators, target, key, desc); + else for (var i = decorators.length - 1; i >= 0; i--) if (d = decorators[i]) r = (c < 3 ? d(r) : c > 3 ? d(target, key, r) : d(target, key)) || r; + return c > 3 && r && Object.defineProperty(target, key, r), r; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.RabbitMQCustomModule = void 0; +const common_1 = require("@nestjs/common"); +const rabbitmq_client_1 = require("./rabbitmq.client"); +const nestjs_rabbitmq_1 = require("@golevelup/nestjs-rabbitmq"); +let RabbitMQCustomModule = class RabbitMQCustomModule { +}; +RabbitMQCustomModule = __decorate([ + (0, common_1.Module)({ + imports: [ + nestjs_rabbitmq_1.RabbitMQModule.forRoot(nestjs_rabbitmq_1.RabbitMQModule, { + name: 'default', + exchanges: [ + { + name: 'exchang1', + type: 'topic', + }, + ], + uri: 'amqp://guest:guest@rabbitmq:5672/', + enableControllerDiscovery: true, + }), + ], + providers: [rabbitmq_client_1.RabbitMQClient], + exports: [rabbitmq_client_1.RabbitMQClient], + }) +], RabbitMQCustomModule); +exports.RabbitMQCustomModule = RabbitMQCustomModule; +//# sourceMappingURL=rabbitmq.module.js.map \ No newline at end of file diff --git a/dist/pubsub/rabbitmq/rabbitmq.module.js.map b/dist/pubsub/rabbitmq/rabbitmq.module.js.map new file mode 100644 index 00000000..89198c7d --- /dev/null +++ b/dist/pubsub/rabbitmq/rabbitmq.module.js.map @@ -0,0 +1 @@ +{"version":3,"file":"rabbitmq.module.js","sourceRoot":"","sources":["../../../src/pubsub/rabbitmq/rabbitmq.module.ts"],"names":[],"mappings":";;;;;;;;;AAAA,2CAAwC;AACxC,uDAAmD;AACnD,gEAA4D;AAmBrD,IAAM,oBAAoB,GAA1B,MAAM,oBAAoB;CAAG,CAAA;AAAvB,oBAAoB;IAjBhC,IAAA,eAAM,EAAC;QACN,OAAO,EAAE;YACP,gCAAc,CAAC,OAAO,CAAC,gCAAc,EAAE;gBACrC,IAAI,EAAE,SAAS;gBACf,SAAS,EAAE;oBACT;wBACE,IAAI,EAAE,UAAU;wBAChB,IAAI,EAAE,OAAO;qBACd;iBACF;gBACD,GAAG,EAAE,mCAAmC;gBACxC,yBAAyB,EAAE,IAAI;aAChC,CAAC;SACH;QACD,SAAS,EAAE,CAAC,gCAAc,CAAC;QAC3B,OAAO,EAAE,CAAC,gCAAc,CAAC;KAC1B,CAAC;GACW,oBAAoB,CAAG;AAAvB,oDAAoB"} \ No newline at end of file diff --git a/dist/pubsub/rabbitmq/types/exchangeMap.d.ts b/dist/pubsub/rabbitmq/types/exchangeMap.d.ts new file mode 100644 index 00000000..e44bdb23 --- /dev/null +++ b/dist/pubsub/rabbitmq/types/exchangeMap.d.ts @@ -0,0 +1,5 @@ +export declare const exchangeMap: { + triggers: string; + "collection-import": string; + "search-index": string; +}; diff --git a/dist/pubsub/rabbitmq/types/exchangeMap.js b/dist/pubsub/rabbitmq/types/exchangeMap.js new file mode 100644 index 00000000..f337253e --- /dev/null +++ b/dist/pubsub/rabbitmq/types/exchangeMap.js @@ -0,0 +1,10 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.exchangeMap = void 0; +const types_1 = require("../../types"); +exports.exchangeMap = { + [types_1.PubSubTopic.TRIGGERS]: 'triggers-exchange', + [types_1.PubSubTopic.COLLECTION_IMPORT]: 'collection-import-exchange', + [types_1.PubSubTopic.SEARCH_INDEX]: 'search-index-exchange', +}; +//# sourceMappingURL=exchangeMap.js.map \ No newline at end of file diff --git a/dist/pubsub/rabbitmq/types/exchangeMap.js.map b/dist/pubsub/rabbitmq/types/exchangeMap.js.map new file mode 100644 index 00000000..c5742ad6 --- /dev/null +++ b/dist/pubsub/rabbitmq/types/exchangeMap.js.map @@ -0,0 +1 @@ +{"version":3,"file":"exchangeMap.js","sourceRoot":"","sources":["../../../../src/pubsub/rabbitmq/types/exchangeMap.ts"],"names":[],"mappings":";;;AAAA,uCAA0C;AAE7B,QAAA,WAAW,GAAG;IACzB,CAAC,mBAAW,CAAC,QAAQ,CAAC,EAAE,mBAAmB;IAC3C,CAAC,mBAAW,CAAC,iBAAiB,CAAC,EAAE,4BAA4B;IAC7D,CAAC,mBAAW,CAAC,YAAY,CAAC,EAAE,uBAAuB;CACpD,CAAC"} \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 1d87539a..4f9f232f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,7 @@ "name": "@spaace/common-lib", "version": "0.0.1", "dependencies": { + "@golevelup/nestjs-rabbitmq": "^5.1.0", "@google-cloud/kms": "^3.8.0", "@google-cloud/pubsub": "^3.3.0", "@nestjs/graphql": "^12.0.9", @@ -22,6 +23,7 @@ "typeorm": "^0.3.17" }, "devDependencies": { + "@types/amqplib": "^0.10.5", "@types/asn1": "^0.2.1", "@types/node": "^16.11.10", "@types/ws": "^8.5.3", @@ -38,6 +40,24 @@ "typescript": "^4.5.2" } }, + "node_modules/@acuminous/bitsyntax": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz", + "integrity": "sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ==", + "dependencies": { + "buffer-more-ints": "~1.0.0", + "debug": "^4.3.4", + "safe-buffer": "~5.1.2" + }, + "engines": { + "node": ">=0.8" + } + }, + "node_modules/@acuminous/bitsyntax/node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, "node_modules/@babel/parser": { "version": "7.22.4", "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.22.4.tgz", @@ -819,6 +839,61 @@ "@ethersproject/strings": "^5.7.0" } }, + "node_modules/@golevelup/nestjs-common": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/@golevelup/nestjs-common/-/nestjs-common-2.0.0.tgz", + "integrity": "sha512-D9RLXgkqn9SDLnZ2VoMER9l/+g5CM9Z7sZXa+10+0rZs6yevMepoiWmMVsFoUXLzYG2GwfixHLExwUr3XBCHFw==", + "dependencies": { + "lodash": "^4.17.21", + "nanoid": "^3.3.6" + }, + "peerDependencies": { + "@nestjs/common": "^10.x" + } + }, + "node_modules/@golevelup/nestjs-discovery": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/@golevelup/nestjs-discovery/-/nestjs-discovery-4.0.0.tgz", + "integrity": "sha512-iyZLYip9rhVMR0C93vo860xmboRrD5g5F5iEOfpeblGvYSz8ymQrL9RAST7x/Fp3n+TAXSeOLzDIASt+rak68g==", + "dependencies": { + "lodash": "^4.17.21" + }, + "peerDependencies": { + "@nestjs/common": "^10.x", + "@nestjs/core": "^10.x" + } + }, + "node_modules/@golevelup/nestjs-modules": { + "version": "0.7.1", + "resolved": "https://registry.npmjs.org/@golevelup/nestjs-modules/-/nestjs-modules-0.7.1.tgz", + "integrity": "sha512-L5hBuU57ujl73IAyyZYkT+41tbo+9BggLzze7AMSWJhNUJBhvifFvtfSNRy2aTTPsLYuHLH2tmerjjOQkEdgDg==", + "dependencies": { + "lodash": "^4.17.21" + }, + "peerDependencies": { + "@nestjs/common": "^10.x", + "rxjs": "^7.x" + } + }, + "node_modules/@golevelup/nestjs-rabbitmq": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/@golevelup/nestjs-rabbitmq/-/nestjs-rabbitmq-5.1.0.tgz", + "integrity": "sha512-xelwhRENbZb5bygVFzIR7v8X8B45R9oT0Mgp5Vv3KXKMwophehP+m23n/qOWqYW2Ow+jJFMp2tUT0CS8kuxgjg==", + "dependencies": { + "@golevelup/nestjs-common": "^2.0.0", + "@golevelup/nestjs-discovery": "^4.0.0", + "@golevelup/nestjs-modules": "^0.7.1", + "amqp-connection-manager": "^4.1.14", + "amqplib": "^0.10.3", + "lodash": "^4.17.21" + }, + "peerDependencies": { + "@nestjs/common": "^10.x", + "@nestjs/core": "^10.x", + "reflect-metadata": "^0.1.0 || ^0.2.0", + "rxjs": "^7.x" + } + }, "node_modules/@google-cloud/kms": { "version": "3.8.0", "resolved": "https://registry.npmjs.org/@google-cloud/kms/-/kms-3.8.0.tgz", @@ -1422,6 +1497,15 @@ "integrity": "sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==", "devOptional": true }, + "node_modules/@types/amqplib": { + "version": "0.10.5", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.5.tgz", + "integrity": "sha512-/cSykxROY7BWwDoi4Y4/jLAuZTshZxd8Ey1QYa/VaXriMotBDoou7V/twJiOSHzU6t1Kp1AHAUXGCgqq+6DNeg==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/asn1": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/@types/asn1/-/asn1-0.2.1.tgz", @@ -1776,6 +1860,51 @@ "url": "https://github.com/sponsors/epoberezkin" } }, + "node_modules/amqp-connection-manager": { + "version": "4.1.14", + "resolved": "https://registry.npmjs.org/amqp-connection-manager/-/amqp-connection-manager-4.1.14.tgz", + "integrity": "sha512-1km47dIvEr0HhMUazqovSvNwIlSvDX2APdUpULaINtHpiki1O+cLRaTeXb/jav4OLtH+k6GBXx5gsKOT9kcGKQ==", + "dependencies": { + "promise-breaker": "^6.0.0" + }, + "engines": { + "node": ">=10.0.0", + "npm": ">5.0.0" + }, + "peerDependencies": { + "amqplib": "*" + } + }, + "node_modules/amqplib": { + "version": "0.10.3", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.3.tgz", + "integrity": "sha512-UHmuSa7n8vVW/a5HGh2nFPqAEr8+cD4dEZ6u9GjP91nHfr1a54RyAKyra7Sb5NH7NBKOUlyQSMXIp0qAixKexw==", + "dependencies": { + "@acuminous/bitsyntax": "^0.1.2", + "buffer-more-ints": "~1.0.0", + "readable-stream": "1.x >=1.1.9", + "url-parse": "~1.5.10" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/amqplib/node_modules/readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ==", + "dependencies": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "node_modules/amqplib/node_modules/string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" + }, "node_modules/ansi-regex": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.1.tgz", @@ -1972,6 +2101,11 @@ "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", "integrity": "sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==" }, + "node_modules/buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==" + }, "node_modules/buffer-writer": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", @@ -2165,6 +2299,11 @@ "integrity": "sha512-9vAdYbHj6x2fLKC4+oPH0kFzY/orMZyG2Aj+kNylHxKGJ/Ed4dpNyAQYwJOdqO4zdM7XpVHmyejQDcQHrnuXbw==", "peer": true }, + "node_modules/core-util-is": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz", + "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==" + }, "node_modules/create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", @@ -3415,6 +3554,11 @@ "resolved": "https://registry.npmjs.org/is-stream-ended/-/is-stream-ended-0.1.4.tgz", "integrity": "sha512-xj0XPvmr7bQFTvirqnFr50o0hQIh6ZItDqloxt5aJrR4NQsYeSsyFQERYGCAzfindAcnKjINnwEEgLx4IqVzQw==" }, + "node_modules/isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==" + }, "node_modules/isexe": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/isexe/-/isexe-2.0.0.tgz", @@ -3742,6 +3886,23 @@ "thenify-all": "^1.0.0" } }, + "node_modules/nanoid": { + "version": "3.3.7", + "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.7.tgz", + "integrity": "sha512-eSRppjcPIatRIMC1U6UngP8XFcz8MQWGQdt1MTBQ7NaAmvXDfvNxbvWV3x2y6CdEUciCSsDHDQZbhYaB8QEo2g==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/ai" + } + ], + "bin": { + "nanoid": "bin/nanoid.cjs" + }, + "engines": { + "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" + } + }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", @@ -4121,6 +4282,11 @@ "node": ">=6.0.0" } }, + "node_modules/promise-breaker": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/promise-breaker/-/promise-breaker-6.0.0.tgz", + "integrity": "sha512-BthzO9yTPswGf7etOBiHCVuugs2N01/Q/94dIPls48z2zCmrnDptUUZzfIb+41xq0MnYZ/BzmOd6ikDR4ibNZA==" + }, "node_modules/proto3-json-serializer": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/proto3-json-serializer/-/proto3-json-serializer-1.1.1.tgz", @@ -4204,6 +4370,11 @@ "node": ">=6" } }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, "node_modules/queue-microtask": { "version": "1.2.3", "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.3.tgz", @@ -4290,6 +4461,11 @@ "node": ">=0.10.0" } }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==" + }, "node_modules/requizzle": { "version": "0.2.4", "resolved": "https://registry.npmjs.org/requizzle/-/requizzle-0.2.4.tgz", @@ -4955,6 +5131,15 @@ "punycode": "^2.1.0" } }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "node_modules/util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", diff --git a/package.json b/package.json index c23fa1e3..52563d6e 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "migration:revert": "npm run typeorm -- migration:revert -d src/database" }, "dependencies": { + "@golevelup/nestjs-rabbitmq": "^5.1.0", "@google-cloud/kms": "^3.8.0", "@google-cloud/pubsub": "^3.3.0", "@nestjs/graphql": "^12.0.9", @@ -33,6 +34,7 @@ "typeorm": "^0.3.17" }, "devDependencies": { + "@types/amqplib": "^0.10.5", "@types/asn1": "^0.2.1", "@types/node": "^16.11.10", "@types/ws": "^8.5.3", diff --git a/src/pubsub/index.ts b/src/pubsub/index.ts index d860d067..e75d6dc9 100644 --- a/src/pubsub/index.ts +++ b/src/pubsub/index.ts @@ -1,2 +1,3 @@ export * from './client'; export * from './types'; +export * from './rabbitmq'; diff --git a/src/pubsub/rabbitmq/index.ts b/src/pubsub/rabbitmq/index.ts new file mode 100644 index 00000000..cf1ad8f7 --- /dev/null +++ b/src/pubsub/rabbitmq/index.ts @@ -0,0 +1,3 @@ +export * from './rabbitmq.client'; +export * from './rabbitmq.module'; +export * from './types/exchangeMap'; diff --git a/src/pubsub/rabbitmq/rabbitmq.client.ts b/src/pubsub/rabbitmq/rabbitmq.client.ts new file mode 100644 index 00000000..eed89cc6 --- /dev/null +++ b/src/pubsub/rabbitmq/rabbitmq.client.ts @@ -0,0 +1,71 @@ +// rabbitmq-client.ts +import { Injectable } from '@nestjs/common'; +import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; +import { + ArenaPubSubMessage, + ArenaPubSubTrigger, + PubSubMessage, + PubSubTopic, + PubSubTrigger, +} from '../types'; +import { exchangeMap } from './types/exchangeMap'; + +@Injectable() +export class RabbitMQClient { + constructor(private readonly amqpConnection: AmqpConnection) {} + + async publish( + topic: T, + routingKey: string, + message: + | PubSubMessage> + | ArenaPubSubMessage>, + ) { + const exchange = exchangeMap[topic]; + await this.amqpConnection.channel.assertExchange(exchange, 'topic', { + durable: true, + }); + this.amqpConnection.channel.publish( + exchange, + routingKey, + Buffer.from(JSON.stringify(message)), + ); + console.log(`Published message to ${exchange}:${routingKey}`); + } + + async subscribe( + topic: T, + routingKey: string, + queueName: string, + onMessage: ( + msg: + | PubSubMessage> + | ArenaPubSubMessage>, + ) => void, + ) { + const exchange = exchangeMap[topic]; + await this.amqpConnection.channel.assertExchange(exchange, 'topic', { + durable: true, + }); + await this.amqpConnection.channel.assertQueue(queueName, { durable: true }); + await this.amqpConnection.channel.bindQueue( + queueName, + exchange, + routingKey, + ); + this.amqpConnection.channel.consume( + queueName, + (msg) => { + if (msg) { + const message = JSON.parse(msg.content.toString()); + onMessage(message); + this.amqpConnection.channel.ack(msg); + } + }, + { noAck: false }, + ); + console.log( + `Subscribed to ${exchange}:${routingKey} with queue ${queueName}`, + ); + } +} diff --git a/src/pubsub/rabbitmq/rabbitmq.module.ts b/src/pubsub/rabbitmq/rabbitmq.module.ts new file mode 100644 index 00000000..cb5229ba --- /dev/null +++ b/src/pubsub/rabbitmq/rabbitmq.module.ts @@ -0,0 +1,22 @@ +import { Module } from '@nestjs/common'; +import { RabbitMQClient } from './rabbitmq.client'; +import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; + +@Module({ + imports: [ + RabbitMQModule.forRoot(RabbitMQModule, { + name: 'default', + exchanges: [ + { + name: 'exchang1', + type: 'topic', + }, + ], + uri: 'amqp://guest:guest@rabbitmq:5672/', + enableControllerDiscovery: true, + }), + ], + providers: [RabbitMQClient], + exports: [RabbitMQClient], +}) +export class RabbitMQCustomModule {} diff --git a/src/pubsub/rabbitmq/types/exchangeMap.ts b/src/pubsub/rabbitmq/types/exchangeMap.ts new file mode 100644 index 00000000..f60f8131 --- /dev/null +++ b/src/pubsub/rabbitmq/types/exchangeMap.ts @@ -0,0 +1,7 @@ +import { PubSubTopic } from '../../types'; + +export const exchangeMap = { + [PubSubTopic.TRIGGERS]: 'triggers-exchange', + [PubSubTopic.COLLECTION_IMPORT]: 'collection-import-exchange', + [PubSubTopic.SEARCH_INDEX]: 'search-index-exchange', +}; From 223e3691fdd395e274defe1fffe391bdd182a556 Mon Sep 17 00:00:00 2001 From: sleepyqadir Date: Mon, 11 Mar 2024 13:03:30 +0500 Subject: [PATCH 2/2] added the example implementation readme for rabbitmq --- src/pubsub/rabbitmq/example.md | 105 +++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 src/pubsub/rabbitmq/example.md diff --git a/src/pubsub/rabbitmq/example.md b/src/pubsub/rabbitmq/example.md new file mode 100644 index 00000000..6381d819 --- /dev/null +++ b/src/pubsub/rabbitmq/example.md @@ -0,0 +1,105 @@ +# Using RabbitMQ with NestJS: Integration Guide + +This guide demonstrates how to integrate RabbitMQ for message publishing and subscribing in a NestJS application using the RabbitMQ client and module provided by the `@spaace/common-lib` package. + +## Setup + +Ensure your project is set up to use RabbitMQ by importing the necessary components from `@spaace/common-lib`. + +### Import RabbitMQCustomModule + +First, include the `RabbitMQCustomModule` in your application module or any specific module where you want to use RabbitMQ for messaging. This module configures and establishes the connection to your RabbitMQ server. + +```typescript +import { Module } from '@nestjs/common'; +import { RabbitMQCustomModule } from '@spaace/common-lib'; + +@Module({ + imports: [RabbitMQCustomModule], + // other module properties +}) +export class AppModule {} +``` + +## Publishing Messages + +To publish messages, you'll utilize the `RabbitMQClient`, also provided by the `@spaace/common-lib`. Here’s how to inject and use it in your service. + +### Example: Publishing a Message + +Inject the `RabbitMQClient` into your service and call its `publish` method to send messages to a specified topic with a routing key. + +```typescript +import { Injectable } from '@nestjs/common'; +import { + PubSubMessage, + PubSubTopic, + PubSubTrigger, + QuestTrigger, + RabbitMQClient, +} from '@spaace/common-lib'; + +@Injectable() +export class AppService { + constructor(private readonly rabbitMQClient: RabbitMQClient) {} + + async publishExampleMessage() { + const topic: PubSubTopic = PubSubTopic.TRIGGERS; + const routingKey = 'example.routing.token'; + const message: PubSubMessage> = { + trigger: QuestTrigger.TOKEN_TRANSFER, + data: { + transactionId: '12345', + amount: 100, + from: 'UserA', + to: 'UserB', + }, + }; + + await this.rabbitMQClient.publish(topic, routingKey, message); + console.log('Message published successfully'); + } +} +``` + +## Subscribing to Messages + +Subscribing to messages allows your application to listen for and process incoming messages from a queue. Here’s an example of setting up a subscription in a service. + +### Example: Subscribing to Messages + +Define a method in your service that invokes the `subscribe` function of the `RabbitMQClient` to listen for messages on a specific queue, filtered by a routing key. + +```typescript +import { Injectable } from '@nestjs/common'; +import { + PubSubTopic, + RabbitMQClient, +} from '@spaace/common-lib'; + +@Injectable() +export class AppService { + constructor(private readonly rabbitMQClient: RabbitMQClient) {} + + async subscribeExampleMessages() { + const topic: PubSubTopic = PubSubTopic.TRIGGERS; + const routingKey = 'example.routing.token'; + const queueName = 'example-queue'; + + await this.rabbitMQClient.subscribe(topic, routingKey, queueName, (message) => { + console.log('Received message:', message); + // Implement your message processing logic here + }); + + console.log('Subscribed successfully'); + } +} +``` + +## Key Takeaways + +- **RabbitMQClient** and **RabbitMQCustomModule**: Central to integrating RabbitMQ in your NestJS application, providing a streamlined approach to publishing and subscribing to messages. +- **Topic and Routing Key**: Essential for directing messages to the correct queues and ensuring they reach the intended subscribers. +- **Message Processing**: Implement custom logic within the subscription callback to handle incoming messages according to your application’s needs. + +By following this guide, you can effectively leverage RabbitMQ in your NestJS applications for robust, asynchronous messaging between different parts of your system, facilitated by the utilities provided in `@spaace/common-lib`. \ No newline at end of file