Skip to content

Commit 333df54

Browse files
authored
Merge pull request #75 from codeGROOVE-dev/reliable
improve DM queueing
2 parents 212a9d4 + a4d6b0b commit 333df54

File tree

3 files changed

+279
-85
lines changed

3 files changed

+279
-85
lines changed

pkg/bot/bot.go

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,25 @@ func (c *Coordinator) handlePullRequestEventWithData(ctx context.Context, owner,
734734
slog.Info("no users blocking PR - no notifications needed",
735735
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber),
736736
"pr_state", prState)
737+
738+
// For merged/closed PRs, still update existing DMs even if no one is blocking
739+
// This ensures users see the final state (🚀 merged or ❌ closed)
740+
if prState == "merged" || prState == "closed" {
741+
slog.Info("updating DMs for terminal PR state",
742+
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber),
743+
"pr_state", prState,
744+
"reason", "terminal_state_update")
745+
c.updateDMMessagesForPR(ctx, prUpdateInfo{
746+
Owner: owner,
747+
Repo: repo,
748+
PRNumber: prNumber,
749+
PRTitle: event.PullRequest.Title,
750+
PRAuthor: event.PullRequest.User.Login,
751+
PRState: prState,
752+
PRURL: event.PullRequest.HTMLURL,
753+
CheckResult: checkResult,
754+
})
755+
}
737756
}
738757
}
739758

@@ -876,12 +895,18 @@ func (*Coordinator) getStateQueryParam(prState string) string {
876895
}
877896
}
878897

898+
// UserTagInfo contains information about a user who was tagged in PR notifications.
899+
type UserTagInfo struct {
900+
UserID string
901+
IsInAnyChannel bool // True if user is member of at least one channel where they were tagged
902+
}
903+
879904
// processChannelsInParallel processes multiple channels concurrently for better performance.
880905
// processChannelsInParallel processes PR notifications for multiple channels concurrently.
881-
// Returns a map of Slack user IDs that were successfully tagged in at least one channel.
906+
// Returns a map of Slack user IDs to tag info for users successfully tagged in at least one channel.
882907
func (c *Coordinator) processChannelsInParallel(
883908
ctx context.Context, prCtx prContext, channels []string, workspaceID string,
884-
) map[string]bool {
909+
) map[string]UserTagInfo {
885910
event, ok := prCtx.Event.(struct {
886911
Action string `json:"action"`
887912
PullRequest struct {
@@ -951,7 +976,7 @@ func (c *Coordinator) processChannelsInParallel(
951976

952977
// Track which Slack users were successfully tagged across all channels
953978
var taggedUsersMu sync.Mutex
954-
taggedUsers := make(map[string]bool)
979+
taggedUsers := make(map[string]UserTagInfo)
955980

956981
// Process channels in parallel for better performance
957982
// Use WaitGroup instead of errgroup since we don't want one failure to cancel others
@@ -967,8 +992,18 @@ func (c *Coordinator) processChannelsInParallel(
967992

968993
// Merge tagged users from this channel into the overall set
969994
taggedUsersMu.Lock()
970-
for userID := range channelTaggedUsers {
971-
taggedUsers[userID] = true
995+
for userID, info := range channelTaggedUsers {
996+
existing, exists := taggedUsers[userID]
997+
if !exists {
998+
// First time seeing this user
999+
taggedUsers[userID] = info
1000+
} else {
1001+
// User already tagged in another channel - update IsInAnyChannel if this channel has them
1002+
if info.IsInAnyChannel {
1003+
existing.IsInAnyChannel = true
1004+
taggedUsers[userID] = existing
1005+
}
1006+
}
9721007
}
9731008
taggedUsersMu.Unlock()
9741009
}(channelName)
@@ -985,10 +1020,10 @@ func (c *Coordinator) processChannelsInParallel(
9851020
}
9861021

9871022
// processPRForChannel handles PR processing for a single channel (extracted from the main loop).
988-
// Returns a map of Slack user IDs that were successfully tagged in this channel.
1023+
// Returns a map of Slack user IDs to UserTagInfo for users successfully tagged in this channel.
9891024
func (c *Coordinator) processPRForChannel(
9901025
ctx context.Context, prCtx prContext, channelName, workspaceID string,
991-
) map[string]bool {
1026+
) map[string]UserTagInfo {
9921027
owner, repo, prNumber, prState := prCtx.Owner, prCtx.Repo, prCtx.Number, prCtx.State
9931028
checkResult := prCtx.CheckRes
9941029
event, ok := prCtx.Event.(struct {
@@ -1145,11 +1180,12 @@ func (c *Coordinator) resolveAndValidateChannel(
11451180
}
11461181

11471182
// trackUserTagsForDMDelay tracks user tags in channel for DM delay logic.
1183+
// Returns map of Slack user IDs to UserTagInfo with channel membership status.
11481184
func (c *Coordinator) trackUserTagsForDMDelay(
11491185
ctx context.Context, workspaceID, channelID, channelDisplay, owner, repo string, prNumber int,
11501186
checkResult *turn.CheckResponse,
1151-
) map[string]bool {
1152-
taggedUsers := make(map[string]bool)
1187+
) map[string]UserTagInfo {
1188+
taggedUsers := make(map[string]UserTagInfo)
11531189
blockedUsers := c.extractBlockedUsersFromTurnclient(checkResult)
11541190
if len(blockedUsers) == 0 {
11551191
return taggedUsers
@@ -1166,13 +1202,22 @@ func (c *Coordinator) trackUserTagsForDMDelay(
11661202
if c.notifier != nil && c.notifier.Tracker != nil {
11671203
c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber)
11681204
}
1169-
taggedUsers[slackUserID] = true
1205+
1206+
// Check if user is member of this channel (for DM delay decision)
1207+
isInChannel := c.slack.IsUserInChannel(ctx, channelID, slackUserID)
1208+
1209+
taggedUsers[slackUserID] = UserTagInfo{
1210+
UserID: slackUserID,
1211+
IsInAnyChannel: isInChannel,
1212+
}
1213+
11701214
slog.Debug("tracked user tag in channel",
11711215
"workspace", workspaceID,
11721216
"github_user", githubUser,
11731217
"slack_user", slackUserID,
11741218
"channel", channelDisplay,
11751219
"channel_id", channelID,
1220+
"is_in_channel", isInChannel,
11761221
"pr", fmt.Sprintf(prFormatString, owner, repo, prNumber))
11771222
}
11781223
}

pkg/bot/dm.go

Lines changed: 97 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type dmNotificationRequest struct {
3333
// Updates to existing DMs happen immediately (no delay).
3434
// New DMs respect reminder_dm_delay (queue for later if user in channel).
3535
//
36-
//nolint:maintidx,revive // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place
36+
//nolint:maintidx,revive,gocognit // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place
3737
func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotificationRequest) error {
3838
// Lock per user+PR to prevent concurrent goroutines from sending duplicate DMs
3939
lockKey := req.UserID + ":" + req.PRURL
@@ -53,12 +53,24 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
5353
"error", err)
5454
}
5555

56-
// Find any pending DM for this user+PR
57-
var pendingDM *state.PendingDM
56+
// Find ALL pending DMs for this user+PR (there may be multiple if user tagged in multiple channels)
57+
var matchingPendingDMs []*state.PendingDM
5858
for i := range pendingDMs {
5959
if pendingDMs[i].UserID == req.UserID && pendingDMs[i].PRURL == req.PRURL {
60-
pendingDM = &pendingDMs[i]
61-
break
60+
matchingPendingDMs = append(matchingPendingDMs, &pendingDMs[i])
61+
}
62+
}
63+
64+
// Use the first pending DM for decision-making (they should all have same state)
65+
var pendingDM *state.PendingDM
66+
if len(matchingPendingDMs) > 0 {
67+
pendingDM = matchingPendingDMs[0]
68+
// Log if we found duplicates (indicates user was tagged in multiple channels)
69+
if len(matchingPendingDMs) > 1 {
70+
slog.Info("found multiple queued DMs for same user+PR (user tagged in multiple channels)",
71+
"user", req.UserID,
72+
"pr", req.PRURL,
73+
"count", len(matchingPendingDMs))
6274
}
6375
}
6476

@@ -73,19 +85,22 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
7385
userStillBlocked = len(req.CheckResult.Analysis.NextAction) > 0
7486
}
7587

76-
// If user no longer blocked, cancel the queued DM
88+
// If user no longer blocked, cancel ALL queued DMs
7789
if !userStillBlocked {
78-
slog.Info("cancelling queued DM - user no longer blocked",
90+
slog.Info("cancelling queued DMs - user no longer blocked",
7991
"user", req.UserID,
8092
"pr", req.PRURL,
8193
"old_state", pendingDM.PRState,
82-
"new_state", prState)
83-
if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil {
84-
slog.Warn("failed to remove pending DM",
85-
"user", req.UserID,
86-
"pr", req.PRURL,
87-
"dm_id", pendingDM.ID,
88-
"error", err)
94+
"new_state", prState,
95+
"count", len(matchingPendingDMs))
96+
for _, dm := range matchingPendingDMs {
97+
if err := c.stateStore.RemovePendingDM(ctx, dm.ID); err != nil {
98+
slog.Warn("failed to remove pending DM",
99+
"user", req.UserID,
100+
"pr", req.PRURL,
101+
"dm_id", dm.ID,
102+
"error", err)
103+
}
89104
}
90105
return nil
91106
}
@@ -97,23 +112,27 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
97112
"pr", req.PRURL,
98113
"old_state", pendingDM.PRState,
99114
"new_state", prState,
100-
"scheduled_send", pendingDM.SendAfter)
101-
// Remove old queued DM and queue new one with updated state
102-
if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil {
103-
slog.Warn("failed to remove pending DM for update",
104-
"user", req.UserID,
105-
"pr", req.PRURL,
106-
"dm_id", pendingDM.ID,
107-
"error", err)
108-
// Continue anyway - attempt to queue new DM
115+
"scheduled_send", pendingDM.SendAfter,
116+
"removing_duplicates", len(matchingPendingDMs))
117+
// Remove ALL old queued DMs and queue ONE new one with updated state
118+
for _, dm := range matchingPendingDMs {
119+
if err := c.stateStore.RemovePendingDM(ctx, dm.ID); err != nil {
120+
slog.Warn("failed to remove pending DM for update",
121+
"user", req.UserID,
122+
"pr", req.PRURL,
123+
"dm_id", dm.ID,
124+
"error", err)
125+
}
109126
}
127+
// Queue single new DM with updated state
110128
return c.queueDMForUser(ctx, req, prState, pendingDM.SendAfter)
111129
}
112130
// State unchanged, queued DM is still valid
113131
slog.Debug("DM already queued with same state",
114132
"user", req.UserID,
115133
"pr", req.PRURL,
116-
"state", prState)
134+
"state", prState,
135+
"queued_count", len(matchingPendingDMs))
117136
return nil
118137
}
119138

@@ -223,10 +242,28 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
223242
// All updates failed - fall through to send new DM
224243
}
225244

245+
// If we know a DM exists but couldn't find/update it, don't send duplicate
246+
// This prevents duplicate DMs when history search fails or Slack API is slow
247+
if exists {
248+
slog.Warn("DM exists but couldn't be located or updated - skipping to prevent duplicate",
249+
"user", req.UserID,
250+
"pr", req.PRURL,
251+
"last_state", lastNotif.LastState,
252+
"new_state", prState,
253+
"has_channel_id", lastNotif.ChannelID != "",
254+
"has_message_ts", lastNotif.MessageTS != "",
255+
"impact", "user may see stale state temporarily")
256+
return nil
257+
}
258+
226259
// Path 2: Send new DM (check delay logic)
227260
shouldQueue, sendAfter := c.shouldDelayNewDM(ctx, req.UserID, req.ChannelID, req.ChannelName, req.Owner, req.Repo)
228261

229262
if shouldQueue {
263+
// Cancel any existing pending DMs for this user+PR before queueing new one
264+
// This ensures we never have duplicate queued DMs (e.g., from multiple channels)
265+
c.cancelPendingDMs(ctx, req.UserID, req.PRURL)
266+
230267
// Queue for later delivery
231268
slog.Info("queueing DM for delayed delivery",
232269
"user", req.UserID,
@@ -307,39 +344,37 @@ func (c *Coordinator) findDMInHistory(ctx context.Context, userID, prURL string)
307344

308345
// shouldDelayNewDM determines if a new DM should be queued for later.
309346
// Returns (shouldQueue bool, sendAfter time.Time).
310-
// Simplified version of evaluateDMDelay - removes user presence checking and anti-spam.
347+
// Channel membership is determined by caller - if channelID is non-empty, user was in at least one channel.
311348
func (c *Coordinator) shouldDelayNewDM(
312349
ctx context.Context,
313350
userID, channelID, channelName string,
314351
owner, _ string,
315352
) (bool, time.Time) {
316-
// Get configured delay for this channel (in minutes)
317-
delayMinutes := c.configManager.ReminderDMDelay(owner, channelName)
318-
delay := time.Duration(delayMinutes) * time.Minute
319-
320-
// If delay is 0, feature is disabled - send immediately
321-
if delay == 0 {
322-
return false, time.Time{}
323-
}
324-
325-
// If user wasn't tagged in a channel, send immediately
353+
// If channelID is empty, user wasn't in any channel we notified - send immediately
326354
if channelID == "" {
355+
slog.Debug("user not in any channel, sending DM immediately",
356+
"user", userID)
327357
return false, time.Time{}
328358
}
329359

330-
// Check if user is in the channel where they were tagged
331-
isInChannel := c.slack.IsUserInChannel(ctx, channelID, userID)
360+
// User was in at least one channel - apply configured delay
361+
delayMinutes := c.configManager.ReminderDMDelay(owner, channelName)
362+
delay := time.Duration(delayMinutes) * time.Minute
332363

333-
// If user is NOT in channel, they can't see the tag - send immediately
334-
if !isInChannel {
335-
slog.Debug("user not in channel, sending DM immediately",
364+
// If delay is 0, feature is disabled - send immediately even if user in channel
365+
if delay == 0 {
366+
slog.Debug("DM delay feature disabled, sending immediately",
336367
"user", userID,
337-
"channel", channelID)
368+
"owner", owner)
338369
return false, time.Time{}
339370
}
340371

341372
// User is in channel - queue for delayed delivery
342373
sendAfter := time.Now().Add(delay)
374+
slog.Debug("user in channel, delaying DM",
375+
"user", userID,
376+
"delay_minutes", delayMinutes,
377+
"send_after", sendAfter)
343378
return true, sendAfter
344379
}
345380

@@ -446,10 +481,10 @@ func getSentAt(info state.DMInfo, exists bool) time.Time {
446481

447482
// sendDMNotificationsToTaggedUsers sends DM notifications to Slack users who were tagged in channels.
448483
// This runs in a separate goroutine to avoid blocking event processing.
449-
// Uses the simplified sendPRNotification() for all DM operations.
484+
// Decides per-user whether to send immediately or delay based on channel membership.
450485
func (c *Coordinator) sendDMNotificationsToTaggedUsers(
451486
ctx context.Context, workspaceID, owner, repo string,
452-
prNumber int, slackUsers map[string]bool,
487+
prNumber int, taggedUsers map[string]UserTagInfo,
453488
event struct {
454489
Action string `json:"action"`
455490
PullRequest struct {
@@ -468,25 +503,30 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers(
468503
slog.Info("starting DM notification batch for tagged Slack users",
469504
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
470505
"workspace", workspaceID,
471-
"user_count", len(slackUsers))
506+
"user_count", len(taggedUsers))
472507

473508
sentCount := 0
474509
failedCount := 0
510+
delayedCount := 0
475511

476-
for slackUserID := range slackUsers {
477-
// Get tag info to determine which channel the user was tagged in
512+
for _, userInfo := range taggedUsers {
513+
// Determine delay based on channel membership from THIS event
514+
// If user is NOT in any channel we notified → send immediately
515+
// If user IS in at least one channel → apply configured delay
478516
var channelID string
479-
if c.notifier != nil && c.notifier.Tracker != nil {
480-
tagInfo := c.notifier.Tracker.LastUserPRChannelTag(workspaceID, slackUserID, owner, repo, prNumber)
481-
channelID = tagInfo.ChannelID
517+
if userInfo.IsInAnyChannel {
518+
// User is in a channel - apply delay logic
519+
// We don't need the specific channel, just any channel ID to trigger delay
520+
// Use a placeholder to signal "user was in a channel"
521+
channelID = "delay"
522+
delayedCount++
482523
}
524+
// If IsInAnyChannel is false, channelID stays empty → immediate send
483525

484-
// ChannelName is not available (no reverse lookup), so pass empty string
485-
// The delay logic will use the default config for the org
486526
err := c.sendPRNotification(ctx, dmNotificationRequest{
487-
UserID: slackUserID,
488-
ChannelID: channelID,
489-
ChannelName: "", // not available
527+
UserID: userInfo.UserID,
528+
ChannelID: channelID, // "delay" or empty based on channel membership
529+
ChannelName: "", // not used
490530
Owner: owner,
491531
Repo: repo,
492532
PRNumber: prNumber,
@@ -498,7 +538,7 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers(
498538
if err != nil {
499539
slog.Warn("failed to notify user",
500540
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
501-
"slack_user", slackUserID,
541+
"slack_user", userInfo.UserID,
502542
"error", err)
503543
failedCount++
504544
} else {
@@ -510,8 +550,9 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers(
510550
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
511551
"workspace", workspaceID,
512552
"sent_count", sentCount,
553+
"delayed_count", delayedCount,
513554
"failed_count", failedCount,
514-
"total_users", len(slackUsers))
555+
"total_users", len(taggedUsers))
515556
}
516557

517558
// sendDMNotificationsToBlockedUsers sends immediate DM notifications to blocked GitHub users.

0 commit comments

Comments
 (0)