Skip to content

Commit 13447e0

Browse files
feat(history-sync): emit messaging-history.set event on sync completion and fix race condition
Reorder webhook emissions (CHATS_SET, MESSAGES_SET) to fire after database persistence, fixing a race condition where consumers received the event before data was queryable. Emit a new MESSAGING_HISTORY_SET event when progress reaches 100%, allowing consumers to know exactly when history sync is complete and messages are available in the database. Register the new event across all transport types (Webhook, WebSocket, RabbitMQ, NATS, SQS, Kafka, Pusher) and validation schemas.
1 parent bb0ff85 commit 13447e0

File tree

4 files changed

+29
-6
lines changed

4 files changed

+29
-6
lines changed

src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -993,12 +993,12 @@ export class BaileysStartupService extends ChannelStartupService {
993993
chatsRaw.push({ remoteJid: chat.id, instanceId: this.instanceId, name: chat.name });
994994
}
995995

996-
this.sendDataWebhook(Events.CHATS_SET, chatsRaw);
997-
998996
if (this.configService.get<Database>('DATABASE').SAVE_DATA.HISTORIC) {
999997
await this.prismaRepository.chat.createMany({ data: chatsRaw, skipDuplicates: true });
1000998
}
1001999

1000+
this.sendDataWebhook(Events.CHATS_SET, chatsRaw);
1001+
10021002
const messagesRaw: any[] = [];
10031003

10041004
const messagesRepository: Set<string> = new Set(
@@ -1050,15 +1050,15 @@ export class BaileysStartupService extends ChannelStartupService {
10501050
messagesRaw.push(this.prepareMessage(m));
10511051
}
10521052

1053+
if (this.configService.get<Database>('DATABASE').SAVE_DATA.HISTORIC) {
1054+
await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true });
1055+
}
1056+
10531057
this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw], true, undefined, {
10541058
isLatest,
10551059
progress,
10561060
});
10571061

1058-
if (this.configService.get<Database>('DATABASE').SAVE_DATA.HISTORIC) {
1059-
await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true });
1060-
}
1061-
10621062
if (
10631063
this.configService.get<Chatwoot>('CHATWOOT').ENABLED &&
10641064
this.localChatwoot?.enabled &&
@@ -1075,6 +1075,14 @@ export class BaileysStartupService extends ChannelStartupService {
10751075
contacts.filter((c) => !!c.notify || !!c.name).map((c) => ({ id: c.id, name: c.name ?? c.notify })),
10761076
);
10771077

1078+
if (progress === 100) {
1079+
this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, {
1080+
messageCount: messagesRaw.length,
1081+
chatCount: chatsRaw.length,
1082+
contactCount: contacts?.length ?? 0,
1083+
});
1084+
}
1085+
10781086
contacts = undefined;
10791087
messages = undefined;
10801088
chats = undefined;

src/api/integrations/event/event.controller.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ export class EventController {
162162
'CALL',
163163
'TYPEBOT_START',
164164
'TYPEBOT_CHANGE_STATUS',
165+
'MESSAGING_HISTORY_SET',
165166
'REMOVE_INSTANCE',
166167
'LOGOUT_INSTANCE',
167168
'INSTANCE_CREATE',

src/config/env.config.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ export type EventsRabbitmq = {
9191
CALL: boolean;
9292
TYPEBOT_START: boolean;
9393
TYPEBOT_CHANGE_STATUS: boolean;
94+
MESSAGING_HISTORY_SET: boolean;
9495
};
9596

9697
export type Rabbitmq = {
@@ -150,6 +151,7 @@ export type Sqs = {
150151
SEND_MESSAGE: boolean;
151152
TYPEBOT_CHANGE_STATUS: boolean;
152153
TYPEBOT_START: boolean;
154+
MESSAGING_HISTORY_SET: boolean;
153155
};
154156
};
155157

@@ -223,6 +225,7 @@ export type EventsWebhook = {
223225
CALL: boolean;
224226
TYPEBOT_START: boolean;
225227
TYPEBOT_CHANGE_STATUS: boolean;
228+
MESSAGING_HISTORY_SET: boolean;
226229
ERRORS: boolean;
227230
ERRORS_WEBHOOK: string;
228231
};
@@ -256,6 +259,7 @@ export type EventsPusher = {
256259
CALL: boolean;
257260
TYPEBOT_START: boolean;
258261
TYPEBOT_CHANGE_STATUS: boolean;
262+
MESSAGING_HISTORY_SET: boolean;
259263
};
260264

261265
export type ApiKey = { KEY: string };
@@ -537,6 +541,7 @@ export class ConfigService {
537541
CALL: process.env?.RABBITMQ_EVENTS_CALL === 'true',
538542
TYPEBOT_START: process.env?.RABBITMQ_EVENTS_TYPEBOT_START === 'true',
539543
TYPEBOT_CHANGE_STATUS: process.env?.RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
544+
MESSAGING_HISTORY_SET: process.env?.RABBITMQ_EVENTS_MESSAGING_HISTORY_SET === 'true',
540545
},
541546
},
542547
NATS: {
@@ -574,6 +579,7 @@ export class ConfigService {
574579
CALL: process.env?.NATS_EVENTS_CALL === 'true',
575580
TYPEBOT_START: process.env?.NATS_EVENTS_TYPEBOT_START === 'true',
576581
TYPEBOT_CHANGE_STATUS: process.env?.NATS_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
582+
MESSAGING_HISTORY_SET: process.env?.NATS_EVENTS_MESSAGING_HISTORY_SET === 'true',
577583
},
578584
},
579585
SQS: {
@@ -614,6 +620,7 @@ export class ConfigService {
614620
SEND_MESSAGE: process.env?.SQS_GLOBAL_SEND_MESSAGE === 'true',
615621
TYPEBOT_CHANGE_STATUS: process.env?.SQS_GLOBAL_TYPEBOT_CHANGE_STATUS === 'true',
616622
TYPEBOT_START: process.env?.SQS_GLOBAL_TYPEBOT_START === 'true',
623+
MESSAGING_HISTORY_SET: process.env?.SQS_GLOBAL_MESSAGING_HISTORY_SET === 'true',
617624
},
618625
},
619626
KAFKA: {
@@ -657,6 +664,7 @@ export class ConfigService {
657664
CALL: process.env?.KAFKA_EVENTS_CALL === 'true',
658665
TYPEBOT_START: process.env?.KAFKA_EVENTS_TYPEBOT_START === 'true',
659666
TYPEBOT_CHANGE_STATUS: process.env?.KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
667+
MESSAGING_HISTORY_SET: process.env?.KAFKA_EVENTS_MESSAGING_HISTORY_SET === 'true',
660668
},
661669
SASL:
662670
process.env?.KAFKA_SASL_ENABLED === 'true'
@@ -722,6 +730,7 @@ export class ConfigService {
722730
CALL: process.env?.PUSHER_EVENTS_CALL === 'true',
723731
TYPEBOT_START: process.env?.PUSHER_EVENTS_TYPEBOT_START === 'true',
724732
TYPEBOT_CHANGE_STATUS: process.env?.PUSHER_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
733+
MESSAGING_HISTORY_SET: process.env?.PUSHER_EVENTS_MESSAGING_HISTORY_SET === 'true',
725734
},
726735
},
727736
WA_BUSINESS: {
@@ -779,6 +788,7 @@ export class ConfigService {
779788
CALL: process.env?.WEBHOOK_EVENTS_CALL === 'true',
780789
TYPEBOT_START: process.env?.WEBHOOK_EVENTS_TYPEBOT_START === 'true',
781790
TYPEBOT_CHANGE_STATUS: process.env?.WEBHOOK_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
791+
MESSAGING_HISTORY_SET: process.env?.WEBHOOK_EVENTS_MESSAGING_HISTORY_SET === 'true',
782792
ERRORS: process.env?.WEBHOOK_EVENTS_ERRORS === 'true',
783793
ERRORS_WEBHOOK: process.env?.WEBHOOK_EVENTS_ERRORS_WEBHOOK || '',
784794
},

src/validate/instance.schema.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ export const instanceSchema: JSONSchema7 = {
8686
'CALL',
8787
'TYPEBOT_START',
8888
'TYPEBOT_CHANGE_STATUS',
89+
'MESSAGING_HISTORY_SET',
8990
],
9091
},
9192
},
@@ -123,6 +124,7 @@ export const instanceSchema: JSONSchema7 = {
123124
'CALL',
124125
'TYPEBOT_START',
125126
'TYPEBOT_CHANGE_STATUS',
127+
'MESSAGING_HISTORY_SET',
126128
],
127129
},
128130
},
@@ -160,6 +162,7 @@ export const instanceSchema: JSONSchema7 = {
160162
'CALL',
161163
'TYPEBOT_START',
162164
'TYPEBOT_CHANGE_STATUS',
165+
'MESSAGING_HISTORY_SET',
163166
],
164167
},
165168
},
@@ -197,6 +200,7 @@ export const instanceSchema: JSONSchema7 = {
197200
'CALL',
198201
'TYPEBOT_START',
199202
'TYPEBOT_CHANGE_STATUS',
203+
'MESSAGING_HISTORY_SET',
200204
],
201205
},
202206
},

0 commit comments

Comments
 (0)