Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 101 additions & 5 deletions queue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,36 @@ type OnGCFunc func(ctx context.Context, trackValues []string) error
// of the current server time as a way of limiting the keyspace scanned. As a special
// case, any value <= -1 will result in all keys being scanned.
func (c *Client) GC(ctx context.Context, nTimeDigits int, f OnGCFunc) (uint64, uint64, error) {
pipe := c.rdb.Pipeline()
presortedGarbageCmd := pipe.SMembers(ctx, MetaPresortedGarbageSet)
pipe.Del(ctx, MetaPresortedGarbageSet)

if _, err := pipe.Exec(ctx); err != nil {
return 0, 0, err
}

presortedGarbage := presortedGarbageCmd.Val()

if len(presortedGarbage) > 0 {
toDelete := []string{}
for _, key := range presortedGarbage {
trackValue, deadline, ok := strings.Cut(key, ":")
if !ok {
continue
}

toDelete = append(
toDelete,
trackValue,
fmt.Sprintf("%s:expiry:%s", trackValue, deadline),
)
}

if err := c.rdb.HDel(ctx, MetaCancelationHash, toDelete...).Err(); err != nil {
return 0, 0, err
}
}

now, err := c.rdb.Time(ctx).Result()
if err != nil {
return 0, 0, err
Expand Down Expand Up @@ -171,11 +201,23 @@ func (c *Client) gcProcessBatch(ctx context.Context, f OnGCFunc, idsToDelete, ke
}
}

return c.rdb.HDel(
ctx,
MetaCancelationHash,
keysToDelete...,
).Result()
nDeleted, err := c.rdb.HDel(ctx, MetaCancelationHash, keysToDelete...).Result()
if err != nil {
return nDeleted, err
}

// NOTE: ZRem requires an explicit []any which cannot be automatically
// converted from a []string.
zremArgs := make([]any, len(idsToDelete))
for i, id := range idsToDelete {
zremArgs[i] = id
}

if err := c.rdb.ZRem(ctx, MetaDeadlinesZSet, zremArgs...).Err(); err != nil {
return nDeleted, err
}

return nDeleted, nil
}

func (c *Client) callOnGC(ctx context.Context, f OnGCFunc, idsToDelete []string) error {
Expand Down Expand Up @@ -213,6 +255,60 @@ func (c *Client) callOnGC(ctx context.Context, f OnGCFunc, idsToDelete []string)
return f(ctx, trackValues)
}

// CordonDeadlineExceeded selects a chunk of "track values" that have exceeded their
// deadline within a given duration into the past, moves them into the "presorted garbage"
// set for use with `GC`, and returns them as a slice. The times are truncated to the
// second because the deadlines scored set uses unix timestamps as scores.
func (c *Client) CordonDeadlineExceeded(ctx context.Context, within time.Duration) ([]string, error) {
start, err := c.rdb.Time(ctx).Result()
if err != nil {
return []string{}, err
}

pipe := c.rdb.Pipeline()
zRangeOpts := &redis.ZRangeBy{
Min: strconv.Itoa(int(start.Add(-within).Unix())),
Max: strconv.Itoa(int(start.Add(1 * time.Second).Unix())),
}

zRangeCmd := c.rdb.ZRangeByScoreWithScores(
ctx,
MetaDeadlinesZSet,
zRangeOpts,
)

c.rdb.ZRemRangeByScore(
ctx,
MetaDeadlinesZSet,
zRangeOpts.Min,
zRangeOpts.Max,
)

if _, err := pipe.Exec(ctx); err != nil {
return []string{}, err
}

trackValues := zRangeCmd.Val()

if len(trackValues) == 0 {
return []string{}, nil
}

ret := make([]string, len(trackValues))
sMembers := make([]any, len(trackValues))

for i, trackValue := range trackValues {
sMembers[i] = fmt.Sprintf("%s:%d", trackValue.Member, int(trackValue.Score))
ret[i] = fmt.Sprintf("%v", trackValue.Member)
}

if err := c.rdb.SAdd(ctx, MetaPresortedGarbageSet, sMembers...).Err(); err != nil {
return ret, err
}

return ret, nil
}

// Len calculates the aggregate length (XLEN) of the queue. It adds up the
// lengths of all the streams in the queue.
func (c *Client) Len(ctx context.Context, name string) (int64, error) {
Expand Down
Loading