From b2d37ed866ff5e30816d3a73c1dfd5fab6f3d5e4 Mon Sep 17 00:00:00 2001 From: Rueian Date: Mon, 30 Dec 2024 23:57:03 +0800 Subject: [PATCH 01/21] feat: new experimental gc friendly flatten cache Signed-off-by: Rueian --- cache.go | 229 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 229 insertions(+) diff --git a/cache.go b/cache.go index ef5ca860..975d7ae0 100644 --- a/cache.go +++ b/cache.go @@ -2,8 +2,10 @@ package rueidis import ( "context" + "runtime" "sync" "time" + "unsafe" ) // NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation @@ -178,3 +180,230 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) { return a.val, a.err } } + +type flatentry struct { + ovfl *flatentry + next unsafe.Pointer + prev unsafe.Pointer + cmd string + key string + val []byte + ttl int64 + size int64 + mark int64 + mu sync.Mutex +} + +func (f *flatentry) insert(e *flatentry) { + f.size += e.size + f.mu.Lock() + defer f.mu.Unlock() + e.ovfl = f.ovfl + f.ovfl = e +} + +func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) { + if f == nil { + return + } + if ts >= f.ttl { + expired = true + return + } + if cmd == f.cmd { + _ = ret.CacheUnmarshalView(f.val) + return + } + f.mu.Lock() + ovfl := f.ovfl + f.mu.Unlock() + return ovfl.find(cmd, ts) +} + +const lrBatchSize = 64 + +type lrBatch struct { + m map[*flatentry]struct{} +} + +func NewFlattenCache(limit int64) CacheStore { + f := &flatten{ + flights: make(map[string]*adapterEntry), + cache: make(map[string]*flatentry), + head: &flatentry{}, + tail: &flatentry{}, + size: 0, + limit: limit, + } + f.head.next = unsafe.Pointer(f.tail) + f.tail.prev = unsafe.Pointer(f.head) + f.lrup = sync.Pool{New: func() any { + b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} + runtime.SetFinalizer(b, func() { + f.llTailBatch(b) + }) + return b + }} + return f +} + +type flatten struct { + flights map[string]*adapterEntry + cache map[string]*flatentry + head *flatentry + tail *flatentry + lrup sync.Pool + mark int64 + size int64 + limit int64 + mu sync.RWMutex +} + +func (f *flatten) llAdd(e *flatentry) { + e.mark = f.mark + e.prev = f.tail.prev + e.next = unsafe.Pointer(f.tail) + f.tail.prev = unsafe.Pointer(e) + (*flatentry)(e.prev).next = unsafe.Pointer(e) +} + +func (f *flatten) llDel(e *flatentry) { + (*flatentry)(e.prev).next = e.next + (*flatentry)(e.next).prev = e.prev + e.mark = 0 +} + +func (f *flatten) llTail(e *flatentry) { + if e.mark == f.mark { + f.llDel(e) + f.llAdd(e) + } +} + +func (f *flatten) llTailBatch(b *lrBatch) { + f.mu.Lock() + for e := range b.m { + f.llTail(e) + } + f.mu.Unlock() + clear(b.m) +} + +func (f *flatten) remove(e *flatentry) { + f.size -= e.size + f.llDel(e) + delete(f.cache, e.key) +} + +func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) { + f.mu.RLock() + e := f.cache[key] + f.mu.RUnlock() + ts := now.UnixMilli() + if v, _ := e.find(cmd, ts); v.typ != 0 { + batch := f.lrup.Get().(*lrBatch) + batch.m[e] = struct{}{} + if len(batch.m) == lrBatchSize { + f.llTailBatch(batch) + } + f.lrup.Put(batch) + return v, nil + } + fk := key + cmd + f.mu.RLock() + af := f.flights[fk] + f.mu.RUnlock() + if af != nil { + return RedisMessage{}, af + } + f.mu.Lock() + defer f.mu.Unlock() + e = f.cache[key] + v, expired := e.find(cmd, ts) + if v.typ != 0 { + f.llTail(e) + return v, nil + } + if expired { + f.remove(e) + } + if af = f.flights[fk]; af != nil { + return RedisMessage{}, af + } + f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()} + return RedisMessage{}, nil +} + +func (f *flatten) Update(key, cmd string, val RedisMessage) int64 { + fk := key + cmd + bs := val.CacheMarshal(nil) + fe := &flatentry{cmd: cmd, val: bs, ttl: val.CachePXAT(), size: int64(len(bs)+len(key)+len(cmd)) + int64(unsafe.Sizeof(flatentry{}))} + f.mu.Lock() + af := f.flights[fk] + if af != nil { + delete(f.flights, fk) + if af.xat < fe.ttl { + fe.ttl = af.xat + } + } + f.size += fe.size + for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); { + e := (*flatentry)(ep) + f.remove(e) + ep = e.next + } + if e := f.cache[key]; e == nil { + fe.key = key + f.cache[key] = fe + f.llAdd(fe) + } else { + e.insert(fe) + } + f.mu.Unlock() + if af != nil { + af.set(val, nil) + } + return fe.ttl +} + +func (f *flatten) Cancel(key, cmd string, err error) { + fk := key + cmd + f.mu.Lock() + defer f.mu.Unlock() + if af := f.flights[fk]; af != nil { + delete(f.flights, fk) + af.set(RedisMessage{}, err) + } +} + +func (f *flatten) Delete(keys []RedisMessage) { + f.mu.Lock() + defer f.mu.Unlock() + if keys == nil { + f.cache = make(map[string]*flatentry, len(f.cache)) + f.head.next = unsafe.Pointer(f.tail) + f.tail.prev = unsafe.Pointer(f.head) + f.mark++ + f.size = 0 + } else { + for _, k := range keys { + if e := f.cache[k.string]; e != nil { + f.remove(e) + } + } + } +} + +func (f *flatten) Close(err error) { + f.mu.Lock() + flights := f.flights + f.flights = nil + f.cache = nil + f.tail = nil + f.head = nil + f.mark++ + f.mu.Unlock() + for _, entry := range flights { + entry.set(RedisMessage{}, err) + } +} From 76151e277fd78d8963a04a3422d66e2291778ed8 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sun, 5 Jan 2025 10:55:18 -0800 Subject: [PATCH 02/21] feat: new experimental gc friendly flatten cache Signed-off-by: Rueian --- cache.go | 63 ++++++++++++++++++++++++++++----------------------- cache_test.go | 5 ++++ 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/cache.go b/cache.go index 975d7ae0..2ae7d52c 100644 --- a/cache.go +++ b/cache.go @@ -221,25 +221,26 @@ func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) } const lrBatchSize = 64 +const flattEntrySize = unsafe.Sizeof(flatentry{}) type lrBatch struct { m map[*flatentry]struct{} } -func NewFlattenCache(limit int64) CacheStore { +func NewFlattenCache(limit int) CacheStore { f := &flatten{ flights: make(map[string]*adapterEntry), cache: make(map[string]*flatentry), head: &flatentry{}, tail: &flatentry{}, size: 0, - limit: limit, + limit: int64(limit), } f.head.next = unsafe.Pointer(f.tail) f.tail.prev = unsafe.Pointer(f.head) f.lrup = sync.Pool{New: func() any { b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} - runtime.SetFinalizer(b, func() { + runtime.SetFinalizer(b, func(b *lrBatch) { f.llTailBatch(b) }) return b @@ -330,40 +331,46 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red if af = f.flights[fk]; af != nil { return RedisMessage{}, af } - f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()} + if f.flights != nil { + f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()} + } return RedisMessage{}, nil } -func (f *flatten) Update(key, cmd string, val RedisMessage) int64 { +func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { fk := key + cmd - bs := val.CacheMarshal(nil) - fe := &flatentry{cmd: cmd, val: bs, ttl: val.CachePXAT(), size: int64(len(bs)+len(key)+len(cmd)) + int64(unsafe.Sizeof(flatentry{}))} - f.mu.Lock() + f.mu.RLock() af := f.flights[fk] + f.mu.RUnlock() if af != nil { - delete(f.flights, fk) - if af.xat < fe.ttl { - fe.ttl = af.xat + sxat = val.getExpireAt() + if af.xat < sxat || sxat == 0 { + sxat = af.xat + val.setExpireAt(sxat) } - } - f.size += fe.size - for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); { - e := (*flatentry)(ep) - f.remove(e) - ep = e.next - } - if e := f.cache[key]; e == nil { - fe.key = key - f.cache[key] = fe - f.llAdd(fe) - } else { - e.insert(fe) - } - f.mu.Unlock() - if af != nil { + bs := val.CacheMarshal(nil) + fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize)} + f.mu.Lock() + if f.flights != nil { + delete(f.flights, fk) + f.size += fe.size + for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); { + e := (*flatentry)(ep) + f.remove(e) + ep = e.next + } + if e := f.cache[key]; e == nil { + fe.key = key + f.cache[key] = fe + f.llAdd(fe) + } else { + e.insert(fe) + } + } + f.mu.Unlock() af.set(val, nil) } - return fe.ttl + return sxat } func (f *flatten) Cancel(key, cmd string, err error) { diff --git a/cache_test.go b/cache_test.go index 3e6d12f6..9010b8aa 100644 --- a/cache_test.go +++ b/cache_test.go @@ -183,6 +183,11 @@ func TestCacheStore(t *testing.T) { return NewSimpleCacheAdapter(&simple{store: map[string]RedisMessage{}}) }) }) + t.Run("FlattenCache", func(t *testing.T) { + test(t, func() CacheStore { + return NewFlattenCache(DefaultCacheBytes) + }) + }) } type simple struct { From d9d2fb0eb33b4addc7c5ed607dc8caf685a20663 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sun, 5 Jan 2025 13:02:32 -0800 Subject: [PATCH 03/21] feat: new experimental gc friendly flatten cache Signed-off-by: Rueian --- cache.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cache.go b/cache.go index 2ae7d52c..1131ddf3 100644 --- a/cache.go +++ b/cache.go @@ -173,8 +173,13 @@ func (a *adapterEntry) set(val RedisMessage, err error) { } func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) { + ctxCh := ctx.Done() + if ctxCh == nil { + <-a.ch + return a.val, a.err + } select { - case <-ctx.Done(): + case <-ctxCh: return RedisMessage{}, ctx.Err() case <-a.ch: return a.val, a.err @@ -191,7 +196,7 @@ type flatentry struct { ttl int64 size int64 mark int64 - mu sync.Mutex + mu sync.RWMutex } func (f *flatentry) insert(e *flatentry) { @@ -214,9 +219,9 @@ func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) _ = ret.CacheUnmarshalView(f.val) return } - f.mu.Lock() + f.mu.RLock() ovfl := f.ovfl - f.mu.Unlock() + f.mu.RUnlock() return ovfl.find(cmd, ts) } From 180872a8336f2d7d07d57ac9d1a5518e9c2586ad Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 16 Jan 2025 22:30:30 -0800 Subject: [PATCH 04/21] refactor Signed-off-by: Rueian --- cache.go | 57 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/cache.go b/cache.go index 1131ddf3..efe2986c 100644 --- a/cache.go +++ b/cache.go @@ -101,7 +101,7 @@ func (a *adapter) Update(key, cmd string, val RedisMessage) (sxat int64) { val.setExpireAt(sxat) } a.store.Set(key+cmd, val) - flight.set(val, nil) + flight.setVal(val) entries[cmd] = nil } a.mu.Unlock() @@ -112,7 +112,7 @@ func (a *adapter) Cancel(key, cmd string, err error) { a.mu.Lock() entries := a.flights[key] if flight, ok := entries[cmd].(*adapterEntry); ok { - flight.set(RedisMessage{}, err) + flight.setErr(err) entries[cmd] = nil } a.mu.Unlock() @@ -154,7 +154,7 @@ func (a *adapter) Close(err error) { for _, entries := range flights { for _, e := range entries { if e != nil { - e.(*adapterEntry).set(RedisMessage{}, err) + e.(*adapterEntry).setErr(err) } } } @@ -167,8 +167,13 @@ type adapterEntry struct { xat int64 } -func (a *adapterEntry) set(val RedisMessage, err error) { - a.err, a.val = err, val +func (a *adapterEntry) setVal(val RedisMessage) { + a.val = val + close(a.ch) +} + +func (a *adapterEntry) setErr(err error) { + a.err = err close(a.ch) } @@ -202,27 +207,27 @@ type flatentry struct { func (f *flatentry) insert(e *flatentry) { f.size += e.size f.mu.Lock() - defer f.mu.Unlock() e.ovfl = f.ovfl f.ovfl = e + f.mu.Unlock() } func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) { - if f == nil { - return - } - if ts >= f.ttl { - expired = true - return - } - if cmd == f.cmd { - _ = ret.CacheUnmarshalView(f.val) - return + for next := f; next != nil; { + if ts >= next.ttl { + expired = true + return + } + if cmd == next.cmd { + _ = ret.CacheUnmarshalView(next.val) + return + } + next.mu.RLock() + ovfl := next.ovfl + next.mu.RUnlock() + next = ovfl } - f.mu.RLock() - ovfl := f.ovfl - f.mu.RUnlock() - return ovfl.find(cmd, ts) + return } const lrBatchSize = 64 @@ -246,7 +251,9 @@ func NewFlattenCache(limit int) CacheStore { f.lrup = sync.Pool{New: func() any { b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} runtime.SetFinalizer(b, func(b *lrBatch) { + f.mu.Lock() f.llTailBatch(b) + f.mu.Unlock() }) return b }} @@ -287,11 +294,9 @@ func (f *flatten) llTail(e *flatentry) { } func (f *flatten) llTailBatch(b *lrBatch) { - f.mu.Lock() for e := range b.m { f.llTail(e) } - f.mu.Unlock() clear(b.m) } @@ -310,7 +315,9 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red batch := f.lrup.Get().(*lrBatch) batch.m[e] = struct{}{} if len(batch.m) == lrBatchSize { + f.mu.Lock() f.llTailBatch(batch) + f.mu.Unlock() } f.lrup.Put(batch) return v, nil @@ -373,7 +380,7 @@ func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { } } f.mu.Unlock() - af.set(val, nil) + af.setVal(val) } return sxat } @@ -384,7 +391,7 @@ func (f *flatten) Cancel(key, cmd string, err error) { defer f.mu.Unlock() if af := f.flights[fk]; af != nil { delete(f.flights, fk) - af.set(RedisMessage{}, err) + af.setErr(err) } } @@ -416,6 +423,6 @@ func (f *flatten) Close(err error) { f.mark++ f.mu.Unlock() for _, entry := range flights { - entry.set(RedisMessage{}, err) + entry.setErr(err) } } From b8175b20c4c472a7ee68afba88ae6576ce544e42 Mon Sep 17 00:00:00 2001 From: Rueian Date: Mon, 20 Jan 2025 10:15:52 -0800 Subject: [PATCH 05/21] pref: avoid CacheUnmarshalView in locks --- cache.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cache.go b/cache.go index efe2986c..8fe2be9a 100644 --- a/cache.go +++ b/cache.go @@ -212,22 +212,20 @@ func (f *flatentry) insert(e *flatentry) { f.mu.Unlock() } -func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) { +func (f *flatentry) find(cmd string, ts int64) ([]byte, bool) { for next := f; next != nil; { if ts >= next.ttl { - expired = true - return + return nil, true } if cmd == next.cmd { - _ = ret.CacheUnmarshalView(next.val) - return + return next.val, false } next.mu.RLock() ovfl := next.ovfl next.mu.RUnlock() next = ovfl } - return + return nil, false } const lrBatchSize = 64 @@ -311,7 +309,7 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red e := f.cache[key] f.mu.RUnlock() ts := now.UnixMilli() - if v, _ := e.find(cmd, ts); v.typ != 0 { + if v, _ := e.find(cmd, ts); v != nil { batch := f.lrup.Get().(*lrBatch) batch.m[e] = struct{}{} if len(batch.m) == lrBatchSize { @@ -320,7 +318,9 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red f.mu.Unlock() } f.lrup.Put(batch) - return v, nil + var ret RedisMessage + _ = ret.CacheUnmarshalView(v) + return ret, nil } fk := key + cmd f.mu.RLock() @@ -333,9 +333,11 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red defer f.mu.Unlock() e = f.cache[key] v, expired := e.find(cmd, ts) - if v.typ != 0 { + if v != nil { f.llTail(e) - return v, nil + var ret RedisMessage + _ = ret.CacheUnmarshalView(v) + return ret, nil } if expired { f.remove(e) From 4aebfc948711be54d2f3e333bee96d5ef9b53454 Mon Sep 17 00:00:00 2001 From: Rueian Date: Mon, 20 Jan 2025 10:16:16 -0800 Subject: [PATCH 06/21] perf: avoid unnecessary llTailBatch --- cache.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cache.go b/cache.go index 8fe2be9a..76493d11 100644 --- a/cache.go +++ b/cache.go @@ -249,9 +249,11 @@ func NewFlattenCache(limit int) CacheStore { f.lrup = sync.Pool{New: func() any { b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} runtime.SetFinalizer(b, func(b *lrBatch) { - f.mu.Lock() - f.llTailBatch(b) - f.mu.Unlock() + if len(b.m) >= 0 { + f.mu.Lock() + f.llTailBatch(b) + f.mu.Unlock() + } }) return b }} @@ -312,7 +314,7 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red if v, _ := e.find(cmd, ts); v != nil { batch := f.lrup.Get().(*lrBatch) batch.m[e] = struct{}{} - if len(batch.m) == lrBatchSize { + if len(batch.m) >= lrBatchSize { f.mu.Lock() f.llTailBatch(batch) f.mu.Unlock() From a7301ca39f474a8d5fd62fe9bf399e12b53695f5 Mon Sep 17 00:00:00 2001 From: Rueian Date: Mon, 20 Jan 2025 10:54:13 -0800 Subject: [PATCH 07/21] feat: batch delete expired cache with llTailBatch Signed-off-by: Rueian --- cache.go | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/cache.go b/cache.go index 76493d11..e6fbfedc 100644 --- a/cache.go +++ b/cache.go @@ -232,7 +232,7 @@ const lrBatchSize = 64 const flattEntrySize = unsafe.Sizeof(flatentry{}) type lrBatch struct { - m map[*flatentry]struct{} + m map[*flatentry]bool } func NewFlattenCache(limit int) CacheStore { @@ -247,7 +247,7 @@ func NewFlattenCache(limit int) CacheStore { f.head.next = unsafe.Pointer(f.tail) f.tail.prev = unsafe.Pointer(f.head) f.lrup = sync.Pool{New: func() any { - b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} + b := &lrBatch{m: make(map[*flatentry]bool, lrBatchSize)} runtime.SetFinalizer(b, func(b *lrBatch) { if len(b.m) >= 0 { f.mu.Lock() @@ -287,15 +287,19 @@ func (f *flatten) llDel(e *flatentry) { } func (f *flatten) llTail(e *flatentry) { - if e.mark == f.mark { - f.llDel(e) - f.llAdd(e) - } + f.llDel(e) + f.llAdd(e) } func (f *flatten) llTailBatch(b *lrBatch) { - for e := range b.m { - f.llTail(e) + for e, expired := range b.m { + if e.mark == f.mark { + if expired { + f.remove(e) + } else { + f.llTail(e) + } + } } clear(b.m) } @@ -311,18 +315,20 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red e := f.cache[key] f.mu.RUnlock() ts := now.UnixMilli() - if v, _ := e.find(cmd, ts); v != nil { + if v, expired := e.find(cmd, ts); v != nil || expired { batch := f.lrup.Get().(*lrBatch) - batch.m[e] = struct{}{} + batch.m[e] = expired if len(batch.m) >= lrBatchSize { f.mu.Lock() f.llTailBatch(batch) f.mu.Unlock() } f.lrup.Put(batch) - var ret RedisMessage - _ = ret.CacheUnmarshalView(v) - return ret, nil + if v != nil { + var ret RedisMessage + _ = ret.CacheUnmarshalView(v) + return ret, nil + } } fk := key + cmd f.mu.RLock() From 0d30713f5728f10fc599493a54b3a739445282e5 Mon Sep 17 00:00:00 2001 From: Rueian Date: Mon, 20 Jan 2025 10:58:26 -0800 Subject: [PATCH 08/21] pref: avoid CacheUnmarshalView in locks Signed-off-by: Rueian --- cache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cache.go b/cache.go index e6fbfedc..381fca31 100644 --- a/cache.go +++ b/cache.go @@ -338,15 +338,16 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red return RedisMessage{}, af } f.mu.Lock() - defer f.mu.Unlock() e = f.cache[key] v, expired := e.find(cmd, ts) if v != nil { f.llTail(e) + f.mu.Unlock() var ret RedisMessage _ = ret.CacheUnmarshalView(v) return ret, nil } + defer f.mu.Unlock() if expired { f.remove(e) } From b9b2bebc4a744b103cea6ec228c0de7572b2de0f Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 22 Jan 2025 21:49:10 -0800 Subject: [PATCH 09/21] refactor Signed-off-by: Rueian --- cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache.go b/cache.go index 381fca31..dcca8b77 100644 --- a/cache.go +++ b/cache.go @@ -283,7 +283,7 @@ func (f *flatten) llAdd(e *flatentry) { func (f *flatten) llDel(e *flatentry) { (*flatentry)(e.prev).next = e.next (*flatentry)(e.next).prev = e.prev - e.mark = 0 + e.mark = -1 } func (f *flatten) llTail(e *flatentry) { From b91898bb1764703265366b2b37209955d17d7b5e Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 22 Jan 2025 22:44:52 -0800 Subject: [PATCH 10/21] feat: correct the way of handling expired cache Signed-off-by: Rueian --- cache.go | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/cache.go b/cache.go index dcca8b77..c2b777a0 100644 --- a/cache.go +++ b/cache.go @@ -206,6 +206,7 @@ type flatentry struct { func (f *flatentry) insert(e *flatentry) { f.size += e.size + f.ttl = e.ttl f.mu.Lock() e.ovfl = f.ovfl f.ovfl = e @@ -213,10 +214,10 @@ func (f *flatentry) insert(e *flatentry) { } func (f *flatentry) find(cmd string, ts int64) ([]byte, bool) { + if f != nil && ts >= f.ttl { + return nil, true + } for next := f; next != nil; { - if ts >= next.ttl { - return nil, true - } if cmd == next.cmd { return next.val, false } @@ -232,7 +233,7 @@ const lrBatchSize = 64 const flattEntrySize = unsafe.Sizeof(flatentry{}) type lrBatch struct { - m map[*flatentry]bool + m map[*flatentry]struct{} } func NewFlattenCache(limit int) CacheStore { @@ -247,7 +248,7 @@ func NewFlattenCache(limit int) CacheStore { f.head.next = unsafe.Pointer(f.tail) f.tail.prev = unsafe.Pointer(f.head) f.lrup = sync.Pool{New: func() any { - b := &lrBatch{m: make(map[*flatentry]bool, lrBatchSize)} + b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} runtime.SetFinalizer(b, func(b *lrBatch) { if len(b.m) >= 0 { f.mu.Lock() @@ -292,13 +293,9 @@ func (f *flatten) llTail(e *flatentry) { } func (f *flatten) llTailBatch(b *lrBatch) { - for e, expired := range b.m { + for e := range b.m { if e.mark == f.mark { - if expired { - f.remove(e) - } else { - f.llTail(e) - } + f.llTail(e) } } clear(b.m) @@ -315,20 +312,18 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red e := f.cache[key] f.mu.RUnlock() ts := now.UnixMilli() - if v, expired := e.find(cmd, ts); v != nil || expired { + if v, _ := e.find(cmd, ts); v != nil { batch := f.lrup.Get().(*lrBatch) - batch.m[e] = expired + batch.m[e] = struct{}{} if len(batch.m) >= lrBatchSize { f.mu.Lock() f.llTailBatch(batch) f.mu.Unlock() } f.lrup.Put(batch) - if v != nil { - var ret RedisMessage - _ = ret.CacheUnmarshalView(v) - return ret, nil - } + var ret RedisMessage + _ = ret.CacheUnmarshalView(v) + return ret, nil } fk := key + cmd f.mu.RLock() From 2db704a14f1c7890c99206923637292ee9d88cd1 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 23 Jan 2025 10:05:39 -0800 Subject: [PATCH 11/21] feat: correct the way of handling expired cache Signed-off-by: Rueian --- cache.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cache.go b/cache.go index c2b777a0..12e0c55e 100644 --- a/cache.go +++ b/cache.go @@ -377,7 +377,13 @@ func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { f.remove(e) ep = e.next } - if e := f.cache[key]; e == nil { + e := f.cache[key] + if e != nil && e.cmd == cmd { + f.size -= e.size + f.llDel(e) + e = nil + } + if e == nil { fe.key = key f.cache[key] = fe f.llAdd(fe) From 26ba6aa411e09812e36d836ccab6e1ee085148c9 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 23 Jan 2025 14:02:40 -0800 Subject: [PATCH 12/21] perf: add map estimated entry size into cache size estimation Signed-off-by: Rueian --- cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache.go b/cache.go index 12e0c55e..58ee627c 100644 --- a/cache.go +++ b/cache.go @@ -367,7 +367,7 @@ func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { val.setExpireAt(sxat) } bs := val.CacheMarshal(nil) - fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize)} + fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize) + 64} // 64 for 2 map entries f.mu.Lock() if f.flights != nil { delete(f.flights, fk) From 35822ea36a5a2a2af978dd733d1302a434b961cc Mon Sep 17 00:00:00 2001 From: Rueian Date: Sun, 9 Feb 2025 22:19:43 -0800 Subject: [PATCH 13/21] refactor: move the flatten map to internal/cache Signed-off-by: Rueian --- cache.go | 232 ++++------------------------------ internal/cache/chain.go | 72 +++++++++++ internal/cache/chain_test.go | 64 ++++++++++ internal/cache/double.go | 128 +++++++++++++++++++ internal/cache/double_test.go | 94 ++++++++++++++ internal/cache/lru.go | 177 ++++++++++++++++++++++++++ internal/cache/lru_test.go | 194 ++++++++++++++++++++++++++++ 7 files changed, 757 insertions(+), 204 deletions(-) create mode 100644 internal/cache/chain.go create mode 100644 internal/cache/chain_test.go create mode 100644 internal/cache/double.go create mode 100644 internal/cache/double_test.go create mode 100644 internal/cache/lru.go create mode 100644 internal/cache/lru_test.go diff --git a/cache.go b/cache.go index 58ee627c..c0793a80 100644 --- a/cache.go +++ b/cache.go @@ -2,10 +2,11 @@ package rueidis import ( "context" - "runtime" "sync" + "sync/atomic" "time" - "unsafe" + + "github.com/redis/rueidis/internal/cache" ) // NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation @@ -191,250 +192,73 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) { } } -type flatentry struct { - ovfl *flatentry - next unsafe.Pointer - prev unsafe.Pointer - cmd string - key string - val []byte - ttl int64 - size int64 - mark int64 - mu sync.RWMutex -} - -func (f *flatentry) insert(e *flatentry) { - f.size += e.size - f.ttl = e.ttl - f.mu.Lock() - e.ovfl = f.ovfl - f.ovfl = e - f.mu.Unlock() -} - -func (f *flatentry) find(cmd string, ts int64) ([]byte, bool) { - if f != nil && ts >= f.ttl { - return nil, true - } - for next := f; next != nil; { - if cmd == next.cmd { - return next.val, false - } - next.mu.RLock() - ovfl := next.ovfl - next.mu.RUnlock() - next = ovfl - } - return nil, false -} - -const lrBatchSize = 64 -const flattEntrySize = unsafe.Sizeof(flatentry{}) - -type lrBatch struct { - m map[*flatentry]struct{} -} - func NewFlattenCache(limit int) CacheStore { - f := &flatten{ - flights: make(map[string]*adapterEntry), - cache: make(map[string]*flatentry), - head: &flatentry{}, - tail: &flatentry{}, - size: 0, - limit: int64(limit), + return &flatten{ + flights: cache.NewDoubleMap[*adapterEntry](64), + cache: cache.NewLRUDoubleMap[[]byte](64, int64(limit)), } - f.head.next = unsafe.Pointer(f.tail) - f.tail.prev = unsafe.Pointer(f.head) - f.lrup = sync.Pool{New: func() any { - b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} - runtime.SetFinalizer(b, func(b *lrBatch) { - if len(b.m) >= 0 { - f.mu.Lock() - f.llTailBatch(b) - f.mu.Unlock() - } - }) - return b - }} - return f } type flatten struct { - flights map[string]*adapterEntry - cache map[string]*flatentry - head *flatentry - tail *flatentry - lrup sync.Pool - mark int64 - size int64 - limit int64 - mu sync.RWMutex -} - -func (f *flatten) llAdd(e *flatentry) { - e.mark = f.mark - e.prev = f.tail.prev - e.next = unsafe.Pointer(f.tail) - f.tail.prev = unsafe.Pointer(e) - (*flatentry)(e.prev).next = unsafe.Pointer(e) -} - -func (f *flatten) llDel(e *flatentry) { - (*flatentry)(e.prev).next = e.next - (*flatentry)(e.next).prev = e.prev - e.mark = -1 -} - -func (f *flatten) llTail(e *flatentry) { - f.llDel(e) - f.llAdd(e) -} - -func (f *flatten) llTailBatch(b *lrBatch) { - for e := range b.m { - if e.mark == f.mark { - f.llTail(e) - } - } - clear(b.m) -} - -func (f *flatten) remove(e *flatentry) { - f.size -= e.size - f.llDel(e) - delete(f.cache, e.key) + flights *cache.DoubleMap[*adapterEntry] + cache *cache.LRUDoubleMap[[]byte] + close int32 } func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) { - f.mu.RLock() - e := f.cache[key] - f.mu.RUnlock() - ts := now.UnixMilli() - if v, _ := e.find(cmd, ts); v != nil { - batch := f.lrup.Get().(*lrBatch) - batch.m[e] = struct{}{} - if len(batch.m) >= lrBatchSize { - f.mu.Lock() - f.llTailBatch(batch) - f.mu.Unlock() - } - f.lrup.Put(batch) - var ret RedisMessage - _ = ret.CacheUnmarshalView(v) - return ret, nil + if atomic.LoadInt32(&f.close) == 1 { + return RedisMessage{}, nil } - fk := key + cmd - f.mu.RLock() - af := f.flights[fk] - f.mu.RUnlock() - if af != nil { - return RedisMessage{}, af - } - f.mu.Lock() - e = f.cache[key] - v, expired := e.find(cmd, ts) - if v != nil { - f.llTail(e) - f.mu.Unlock() + ts := now.UnixMilli() + if e, ok := f.cache.Find(key, cmd, ts); ok { var ret RedisMessage - _ = ret.CacheUnmarshalView(v) + _ = ret.CacheUnmarshalView(e) return ret, nil } - defer f.mu.Unlock() - if expired { - f.remove(e) - } - if af = f.flights[fk]; af != nil { + xat := ts + ttl.Milliseconds() + if af, ok := f.flights.FindOrInsert(key, cmd, func() *adapterEntry { + return &adapterEntry{ch: make(chan struct{}), xat: xat} + }); ok { return RedisMessage{}, af } - if f.flights != nil { - f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()} - } return RedisMessage{}, nil } func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { - fk := key + cmd - f.mu.RLock() - af := f.flights[fk] - f.mu.RUnlock() - if af != nil { + if af, ok := f.flights.Find(key, cmd); ok { sxat = val.getExpireAt() if af.xat < sxat || sxat == 0 { sxat = af.xat val.setExpireAt(sxat) } bs := val.CacheMarshal(nil) - fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize) + 64} // 64 for 2 map entries - f.mu.Lock() - if f.flights != nil { - delete(f.flights, fk) - f.size += fe.size - for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); { - e := (*flatentry)(ep) - f.remove(e) - ep = e.next - } - e := f.cache[key] - if e != nil && e.cmd == cmd { - f.size -= e.size - f.llDel(e) - e = nil - } - if e == nil { - fe.key = key - f.cache[key] = fe - f.llAdd(fe) - } else { - e.insert(fe) - } - } - f.mu.Unlock() + f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, bs) + f.flights.Delete(key, cmd) af.setVal(val) } return sxat } func (f *flatten) Cancel(key, cmd string, err error) { - fk := key + cmd - f.mu.Lock() - defer f.mu.Unlock() - if af := f.flights[fk]; af != nil { - delete(f.flights, fk) + if af, ok := f.flights.Find(key, cmd); ok { + f.flights.Delete(key, cmd) af.setErr(err) } } func (f *flatten) Delete(keys []RedisMessage) { - f.mu.Lock() - defer f.mu.Unlock() if keys == nil { - f.cache = make(map[string]*flatentry, len(f.cache)) - f.head.next = unsafe.Pointer(f.tail) - f.tail.prev = unsafe.Pointer(f.head) - f.mark++ - f.size = 0 + f.cache.DeleteAll() } else { for _, k := range keys { - if e := f.cache[k.string]; e != nil { - f.remove(e) - } + f.cache.Delete(k.string) } } } func (f *flatten) Close(err error) { - f.mu.Lock() - flights := f.flights - f.flights = nil - f.cache = nil - f.tail = nil - f.head = nil - f.mark++ - f.mu.Unlock() - for _, entry := range flights { + atomic.StoreInt32(&f.close, 1) + f.flights.Iterate(func(entry *adapterEntry) { entry.setErr(err) - } + }) } diff --git a/internal/cache/chain.go b/internal/cache/chain.go new file mode 100644 index 00000000..ce6a09c8 --- /dev/null +++ b/internal/cache/chain.go @@ -0,0 +1,72 @@ +package cache + +type node[V any] struct { + key string + next *node[V] + val V +} +type chain[V any] struct { + node[V] +} + +func (h *chain[V]) find(key string) (val V, ok bool) { + if h.node.key == key { + return h.node.val, true + } + for curr := h.node.next; curr != nil; curr = curr.next { + if curr.key == key { + return curr.val, true + } + } + return val, ok +} + +func (h *chain[V]) insert(key string, val V) { + if h.node.key == "" { + h.node.key = key + h.node.val = val + } else if h.node.key == key { + h.node.val = val + } else { + n := &node[V]{key: key, val: val} + n.next = h.node.next + h.node.next = n + } +} + +func (h *chain[V]) empty() bool { + return h.node.next == nil && h.node.key == "" +} + +func (h *chain[V]) delete(key string) bool { + var zero V + if h.node.key == key { + h.node.key = "" + h.node.val = zero + return h.node.next == nil + } + + if h.node.next == nil { + return h.node.key == "" + } + + if h.node.next.key == key { + h.node.next.key = "" + h.node.next.val = zero + h.node.next, h.node.next.next = h.node.next.next, nil + return h.empty() + } + + prev := h.node.next + curr := h.node.next.next + for curr != nil { + if curr.key == key { + curr.key = "" + curr.val = zero + prev.next, curr.next = curr.next, nil + break + } + prev, curr = curr, curr.next + } + return h.empty() +} diff --git a/internal/cache/chain_test.go b/internal/cache/chain_test.go new file mode 100644 index 00000000..4677d6e8 --- /dev/null +++ b/internal/cache/chain_test.go @@ -0,0 +1,64 @@ +package cache + +import ( + "testing" +) + +func TestChain(t *testing.T) { + h := chain[int]{} + if h.empty() != true { + t.Fatal("chain is not empty") + } + if _, ok := h.find("any"); ok { + t.Fatal("value is found") + } + if empty := h.delete("any"); !empty { + t.Fatal("not empty") + } + h.insert("1", 1) + h.insert("2", 2) + h.insert("3", 3) + if v, ok := h.find("1"); !ok || v != 1 { + t.Fatal("value is not found") + } + if v, ok := h.find("2"); !ok || v != 2 { + t.Fatal("value is not found") + } + if v, ok := h.find("3"); !ok || v != 3 { + t.Fatal("value is not found") + } + if empty := h.delete("1"); empty { + t.Fatal("empty") + } + if _, ok := h.find("1"); ok { + t.Fatal("value is found") + } + if v, ok := h.find("2"); !ok || v != 2 { + t.Fatal("value is not found") + } + if v, ok := h.find("3"); !ok || v != 3 { + t.Fatal("value is not found") + } + if empty := h.delete("2"); empty { + t.Fatal("empty") + } + if _, ok := h.find("2"); ok { + t.Fatal("value is found") + } + if v, ok := h.find("3"); !ok || v != 3 { + t.Fatal("value is not found") + } + h.insert("4", 4) + if v, ok := h.find("3"); !ok || v != 3 { + t.Fatal("value is not found") + } + if v, ok := h.find("4"); !ok || v != 4 { + t.Fatal("value is not found") + } + if empty := h.delete("3"); empty { + t.Fatal("empty") + } + if empty := h.delete("4"); !empty { + t.Fatal("not empty") + } +} diff --git a/internal/cache/double.go b/internal/cache/double.go new file mode 100644 index 00000000..4f2e24ba --- /dev/null +++ b/internal/cache/double.go @@ -0,0 +1,128 @@ +package cache + +import ( + "runtime" + "sync" +) + +const bpsize = 1024 + +type head[V any] struct { + chain[V] + mu sync.RWMutex +} + +type DoubleMap[V any] struct { + ma map[string]*head[V] + bp sync.Pool + mu sync.RWMutex +} + +func (m *DoubleMap[V]) Find(key1, key2 string) (val V, ok bool) { + m.mu.RLock() + if h := m.ma[key1]; h != nil { + h.mu.RLock() + val, ok = h.find(key2) + h.mu.RUnlock() + } + m.mu.RUnlock() + return +} + +func (m *DoubleMap[V]) FindOrInsert(key1, key2 string, fn func() V) (val V, ok bool) { + m.mu.RLock() + if h := m.ma[key1]; h != nil { + h.mu.Lock() + if val, ok = h.find(key2); !ok { + val = fn() + h.insert(key2, val) + } + h.mu.Unlock() + m.mu.RUnlock() + return + } + m.mu.RUnlock() + m.mu.Lock() + h := m.ma[key1] + if h != nil { + if val, ok = h.find(key2); ok { + m.mu.Unlock() + return + } + } else { + h = &head[V]{} + m.ma[key1] = h + } + val = fn() + h.insert(key2, val) + m.mu.Unlock() + return +} + +func (m *DoubleMap[V]) Delete(key1, key2 string) { + var empty bool + m.mu.RLock() + if h := m.ma[key1]; h != nil { + h.mu.Lock() + empty = h.delete(key2) + h.mu.Unlock() + } + m.mu.RUnlock() + if empty { + e := m.bp.Get().(*empties) + e.s = append(e.s, key1) + if len(e.s) >= bpsize { + m.delete(e.s) + clear(e.s) + e.s = e.s[:0] + } + m.bp.Put(e) + return + } +} + +func (m *DoubleMap[V]) delete(keys []string) { + m.mu.Lock() + for _, key := range keys { + if h := m.ma[key]; h != nil { + if h.empty() { + delete(m.ma, key) + } + } + } + m.mu.Unlock() +} + +func (m *DoubleMap[V]) Iterate(cb func(V)) { + m.mu.RLock() + for _, h := range m.ma { + h.mu.RLock() + if h.node.key != "" { + cb(h.node.val) + } + for curr := h.node.next; curr != nil; curr = curr.next { + cb(curr.val) + } + h.mu.RUnlock() + } + m.mu.RUnlock() +} + +type empties struct { + s []string +} + +func NewDoubleMap[V any](hint int) *DoubleMap[V] { + m := &DoubleMap[V]{ma: make(map[string]*head[V], hint)} + m.bp.New = func() any { + e := &empties{s: make([]string, 0, bpsize)} + runtime.SetFinalizer(e, func(e *empties) { + if len(e.s) >= 0 { + m.delete(e.s) + clear(e.s) + } + }) + return e + } + return m +} diff --git a/internal/cache/double_test.go b/internal/cache/double_test.go new file mode 100644 index 00000000..8e2b5fbc --- /dev/null +++ b/internal/cache/double_test.go @@ -0,0 +1,94 @@ +package cache + +import ( + "runtime" + "strconv" + "testing" +) + +func TestDoubleMap(t *testing.T) { + m := NewDoubleMap[int](8) + if _, ok := m.Find("1", "2"); ok { + t.Fatalf("should not find 1 2") + } + if v, ok := m.FindOrInsert("1", "a", func() int { + return 1 + }); ok || v != 1 { + t.Fatalf("should insert 1 but not found") + } + if v, ok := m.FindOrInsert("1", "a", func() int { + return 2 + }); !ok || v != 1 { + t.Fatalf("should found 1") + } + m.Delete("1", "a") + if _, ok := m.Find("1", "2"); ok { + t.Fatalf("should not find 1 2") + } + if v, ok := m.FindOrInsert("1", "a", func() int { + return 2 + }); ok || v != 2 { + t.Fatalf("should insert 1 but not found") + } + if v, ok := m.FindOrInsert("1", "b", func() int { + return 2 + }); ok || v != 2 { + t.Fatalf("should insert 1 but not found") + } + if v, ok := m.FindOrInsert("2", "b", func() int { + return 2 + }); ok || v != 2 { + t.Fatalf("should insert 1 but not found") + } + c := 0 + m.Iterate(func(i int) { + if i != 2 { + t.Fatalf("should iterate 2") + } + c++ + }) + if c != 3 { + t.Fatalf("should iterate 3 times") + } +} + +func TestDoubleMap_Delete(t *testing.T) { + m := NewDoubleMap[int](bpsize) + for i := 0; i < bpsize; i++ { + m.FindOrInsert(strconv.Itoa(i), "a", func() int { + return 1 + }) + } + for i := 0; i < bpsize-1; i++ { + m.Delete(strconv.Itoa(i), "a") + } + m.Delete(strconv.Itoa(bpsize-1), "a") + runtime.GC() + runtime.GC() + m.mu.Lock() + heads := len(m.ma) + m.mu.Unlock() + if heads != 0 { + t.Fatalf("no shrink") + } +} + +func TestDoubleMap_DeleteGC(t *testing.T) { + m := NewDoubleMap[int](bpsize) + for i := 0; i < bpsize; i++ { + m.FindOrInsert(strconv.Itoa(i), "a", func() int { + return 1 + }) + } + for i := 0; i < bpsize-1; i++ { + m.Delete(strconv.Itoa(i), "a") + } + runtime.GC() + runtime.GC() + m.mu.Lock() + heads := len(m.ma) + m.mu.Unlock() + if heads != 1 { + t.Fatalf("no shrink") + } +} diff --git a/internal/cache/lru.go b/internal/cache/lru.go new file mode 100644 index 00000000..fe1da608 --- /dev/null +++ b/internal/cache/lru.go @@ -0,0 +1,177 @@ +package cache + +import ( + "runtime" + "sync" + "unsafe" +) + +const LRUEntrySize = unsafe.Sizeof(linked[[]byte]{}) + +type linked[V any] struct { + key string + head chain[V] + next unsafe.Pointer + prev unsafe.Pointer + size int64 + ts int64 + mark int64 + mu sync.RWMutex +} + +func (h *linked[V]) find(key string, ts int64) (v V, ok bool) { + h.mu.RLock() + defer h.mu.RUnlock() + if h.ts > ts { + return h.head.find(key) + } + return +} + +func (h *linked[V]) close() { + h.mu.Lock() + h.ts = 0 + h.head = chain[V]{} + h.mu.Unlock() +} + +type LRUDoubleMap[V any] struct { + ma map[string]*linked[V] + bp sync.Pool + mu sync.RWMutex + head unsafe.Pointer + tail unsafe.Pointer + total int64 + limit int64 + mark int64 +} + +func (m *LRUDoubleMap[V]) Find(key1, key2 string, ts int64) (val V, ok bool) { + m.mu.RLock() + h := m.ma[key1] + if h != nil { + val, ok = h.find(key2, ts) + } + m.mu.RUnlock() + if ok { + b := m.bp.Get().(*ruBatch[V]) + b.m[h] = struct{}{} + if len(b.m) >= bpsize { + m.moveToTail(b.m) + clear(b.m) + } + m.bp.Put(b) + } + return +} + +func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) { + // TODO: a RLock fast path? + m.mu.Lock() + m.total += size + for m.head != nil { + h := (*linked[V])(m.head) + if m.total <= m.limit && h.ts != 0 { // TODO: clear expired entries? + break + } + m.total -= h.size + delete(m.ma, h.key) + h.mark -= 1 + m.head = h.next + if m.head != nil { + (*linked[V])(m.head).prev = nil + } else { + m.tail = nil + break + } + } + + h := m.ma[key1] + if h == nil { + h = &linked[V]{key: key1, ts: ts, mark: m.mark} + m.ma[key1] = h + } else if h.ts <= ts { + m.total -= h.size + h.size = 0 + } + h.ts = ts + h.size += size + h.next = nil + if m.tail != nil && m.tail != unsafe.Pointer(h) { + h.prev = m.tail + (*linked[V])(m.tail).next = unsafe.Pointer(h) + } + m.tail = unsafe.Pointer(h) + if m.head == nil { + m.head = unsafe.Pointer(h) + } + h.head.insert(key2, v) + m.mu.Unlock() +} + +func (m *LRUDoubleMap[V]) Delete(key1 string) { + m.mu.RLock() + if h := m.ma[key1]; h != nil { + h.close() + } + m.mu.RUnlock() +} + +func (m *LRUDoubleMap[V]) DeleteAll() { + m.mu.Lock() + m.ma = make(map[string]*linked[V], len(m.ma)) + m.head = nil + m.tail = nil + m.total = 0 + m.mark++ + m.mu.Unlock() +} + +func (m *LRUDoubleMap[V]) moveToTail(b map[*linked[V]]struct{}) { + m.mu.Lock() + defer m.mu.Unlock() + for h := range b { + if h.mark != m.mark { + continue + } + prev := h.prev + next := h.next + if prev != nil { + (*linked[V])(prev).next = next + } + if next != nil { + (*linked[V])(next).prev = prev + } + h.next = nil + if m.tail != nil && m.tail != unsafe.Pointer(h) { + h.prev = m.tail + (*linked[V])(m.tail).next = unsafe.Pointer(h) + } + m.tail = unsafe.Pointer(h) + if m.head == unsafe.Pointer(h) && next != nil { + m.head = next + } + } +} + +type ruBatch[V any] struct { + m map[*linked[V]]struct{} +} + +func NewLRUDoubleMap[V any](hint, limit int64) *LRUDoubleMap[V] { + m := &LRUDoubleMap[V]{ + ma: make(map[string]*linked[V], hint), + limit: limit, + } + m.bp.New = func() interface{} { + b := &ruBatch[V]{m: make(map[*linked[V]]struct{}, bpsize)} + runtime.SetFinalizer(b, func(b *ruBatch[V]) { + if len(b.m) > 0 { + m.moveToTail(b.m) + clear(b.m) + } + }) + return b + } + return m +} diff --git a/internal/cache/lru_test.go b/internal/cache/lru_test.go new file mode 100644 index 00000000..83c3eae2 --- /dev/null +++ b/internal/cache/lru_test.go @@ -0,0 +1,194 @@ +package cache + +import ( + "runtime" + "strconv" + "testing" +) + +func TestLRUDoubleMap(t *testing.T) { + m := NewLRUDoubleMap[int](bpsize, bpsize) + if _, ok := m.Find("1", "a", 1); ok { + t.Fatal("should not find 1") + } + m.Insert("1", "a", 1, 2, 1) + m.Insert("1", "b", 1, 2, 2) + m.Insert("2", "c", 1, 2, 3) + if v, ok := m.Find("1", "a", 1); !ok || v != 1 { + t.Fatal("not find 1") + } + if v, ok := m.Find("1", "b", 1); !ok || v != 2 { + t.Fatal("not find 2") + } + if v, ok := m.Find("2", "c", 1); !ok || v != 3 { + t.Fatal("not find 3") + } + if _, ok := m.Find("1", "a", 2); ok { + t.Fatal("should not find") + } + if _, ok := m.Find("1", "b", 2); ok { + t.Fatal("should not find") + } + if _, ok := m.Find("2", "c", 2); ok { + t.Fatal("should not find") + } + m.Delete("1") + if _, ok := m.Find("1", "a", 1); ok { + t.Fatal("should not find") + } + if _, ok := m.Find("1", "b", 1); ok { + t.Fatal("should not find") + } + if v, ok := m.Find("2", "c", 1); !ok || v != 3 { + t.Fatal("not find 3") + } + m.Delete("2") + m.mu.Lock() + heads := len(m.ma) + m.mu.Unlock() + if heads != 2 { + t.Fatal("should have 2 heads") + } + + m.Insert("1", "d", 1, 2, 1) + m.Insert("1", "e", 1, 2, 2) + m.Insert("2", "f", 1, 2, 3) + if v, ok := m.Find("1", "d", 1); !ok || v != 1 { + t.Fatal("not find 1") + } + if v, ok := m.Find("1", "e", 1); !ok || v != 2 { + t.Fatal("not find 2") + } + if v, ok := m.Find("2", "f", 1); !ok || v != 3 { + t.Fatal("not find 3") + } + m.DeleteAll() + if _, ok := m.Find("1", "d", 1); ok { + t.Fatal("should not find") + } + if _, ok := m.Find("1", "e", 1); ok { + t.Fatal("should not find") + } + if _, ok := m.Find("2", "f", 1); ok { + t.Fatal("should not find") + } +} + +func TestLRUCache_LRU_1(t *testing.T) { + m := NewLRUDoubleMap[int](bpsize, bpsize) + for i := 0; i < bpsize; i++ { + m.Insert(strconv.Itoa(i), "a", 2, 2, i) + } + m.mu.Lock() + heads := len(m.ma) + m.mu.Unlock() + if heads != (bpsize / 2) { + t.Fatal("should have bpsize/2 heads", heads) + } + for i := 0; i < bpsize/2; i++ { + if _, ok := m.Find(strconv.Itoa(i), "a", 1); ok { + t.Fatal("should not find") + } + } + for i := bpsize / 2; i < bpsize; i++ { + if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { + t.Fatal("not find") + } + } +} + +func TestLRUCache_LRU_2(t *testing.T) { + m := NewLRUDoubleMap[int](bpsize*2, bpsize*2) + for i := 0; i < bpsize*2; i++ { + m.Insert(strconv.Itoa(i), "a", 1, 2, i) + } + m.mu.Lock() + heads := len(m.ma) + m.mu.Unlock() + if heads != (bpsize * 2) { + t.Fatal("should have bpsize*2 heads", heads) + } + for i := 0; i < bpsize; i++ { + if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { + t.Fatal("not find") + } + } + runtime.GC() + runtime.GC() + for i := bpsize * 2; i < bpsize*3; i++ { + m.Insert(strconv.Itoa(i), "a", 1, 2, i) + } + for i := 0; i < bpsize; i++ { + if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { + t.Fatal("not find") + } + } + for i := bpsize * 1; i < bpsize*2; i++ { + if _, ok := m.Find(strconv.Itoa(i), "a", 1); ok { + t.Fatal("should not find") + } + } + for i := bpsize * 2; i < bpsize*3; i++ { + if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { + t.Fatal("not find") + } + } +} + +func TestLRUCache_LRU_GC(t *testing.T) { + m := NewLRUDoubleMap[int](bpsize, bpsize) + for i := 0; i < bpsize; i++ { + m.Insert(strconv.Itoa(i), "a", 1, 2, i) + } + if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { + t.Fatal("not find") + } + runtime.GC() + runtime.GC() + m.Insert("a", "a", bpsize-1, 2, 0) + m.mu.Lock() + heads := len(m.ma) + total := m.total + m.mu.Unlock() + if heads != 2 { + t.Fatal("should have 2 heads", heads) + } + if total != bpsize { + t.Fatal("should have bpsize", bpsize) + } + for i := 0; i < bpsize; i++ { + if i == bpsize/2 { + if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { + t.Fatal("not find") + } + } else { + if _, ok := m.Find(strconv.Itoa(i), "a", 1); ok { + t.Fatal("should not find") + } + } + } +} + +func TestLRUCache_LRU_GC_2(t *testing.T) { + m := NewLRUDoubleMap[int](bpsize, bpsize) + for i := 0; i < bpsize; i++ { + m.Insert(strconv.Itoa(i), "a", 1, 2, i) + } + if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { + t.Fatal("not find") + } + m.DeleteAll() + runtime.GC() + runtime.GC() + m.Insert("a", "a", bpsize-1, 2, 0) + m.mu.Lock() + heads := len(m.ma) + total := m.total + m.mu.Unlock() + if heads != 1 { + t.Fatal("should have 1 heads", heads) + } + if total != bpsize-1 { + t.Fatal("should have bpsize-1", bpsize-1) + } +} From 4fe09da8cfd665eadf810faf47561dd2f89c3257 Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 11 Feb 2025 20:09:36 -0800 Subject: [PATCH 14/21] fix: flatten cache concurrency and ll Signed-off-by: Rueian --- cache.go | 11 ++-- internal/cache/double.go | 16 ++++-- internal/cache/double_test.go | 2 +- internal/cache/lru.go | 98 +++++++++++++++++++++-------------- internal/cache/lru_test.go | 2 +- 5 files changed, 76 insertions(+), 53 deletions(-) diff --git a/cache.go b/cache.go index c0793a80..507592b1 100644 --- a/cache.go +++ b/cache.go @@ -3,7 +3,6 @@ package rueidis import ( "context" "sync" - "sync/atomic" "time" "github.com/redis/rueidis/internal/cache" @@ -202,13 +201,9 @@ func NewFlattenCache(limit int) CacheStore { type flatten struct { flights *cache.DoubleMap[*adapterEntry] cache *cache.LRUDoubleMap[[]byte] - close int32 } func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) { - if atomic.LoadInt32(&f.close) == 1 { - return RedisMessage{}, nil - } ts := now.UnixMilli() if e, ok := f.cache.Find(key, cmd, ts); ok { var ret RedisMessage @@ -248,7 +243,7 @@ func (f *flatten) Cancel(key, cmd string, err error) { func (f *flatten) Delete(keys []RedisMessage) { if keys == nil { - f.cache.DeleteAll() + f.cache.Reset() } else { for _, k := range keys { f.cache.Delete(k.string) @@ -257,8 +252,8 @@ func (f *flatten) Delete(keys []RedisMessage) { } func (f *flatten) Close(err error) { - atomic.StoreInt32(&f.close, 1) - f.flights.Iterate(func(entry *adapterEntry) { + f.cache.DeleteAll() + f.flights.Close(func(entry *adapterEntry) { entry.setErr(err) }) } diff --git a/internal/cache/double.go b/internal/cache/double.go index 4f2e24ba..719bb680 100644 --- a/internal/cache/double.go +++ b/internal/cache/double.go @@ -41,6 +41,10 @@ func (m *DoubleMap[V]) FindOrInsert(key1, key2 string, fn func() V) (val V, ok b m.mu.RUnlock() return } + if m.ma == nil { + m.mu.RUnlock() + return + } m.mu.RUnlock() m.mu.Lock() h := m.ma[key1] @@ -49,6 +53,9 @@ func (m *DoubleMap[V]) FindOrInsert(key1, key2 string, fn func() V) (val V, ok b m.mu.Unlock() return } + } else if m.ma == nil { + m.mu.Unlock() + return } else { h = &head[V]{} m.ma[key1] = h @@ -93,19 +100,18 @@ func (m *DoubleMap[V]) delete(keys []string) { m.mu.Unlock() } -func (m *DoubleMap[V]) Iterate(cb func(V)) { - m.mu.RLock() +func (m *DoubleMap[V]) Close(cb func(V)) { + m.mu.Lock() for _, h := range m.ma { - h.mu.RLock() if h.node.key != "" { cb(h.node.val) } for curr := h.node.next; curr != nil; curr = curr.next { cb(curr.val) } - h.mu.RUnlock() } - m.mu.RUnlock() + m.ma = nil + m.mu.Unlock() } type empties struct { diff --git a/internal/cache/double_test.go b/internal/cache/double_test.go index 8e2b5fbc..7d94a3b2 100644 --- a/internal/cache/double_test.go +++ b/internal/cache/double_test.go @@ -41,7 +41,7 @@ func TestDoubleMap(t *testing.T) { t.Fatalf("should insert 1 but not found") } c := 0 - m.Iterate(func(i int) { + m.Close(func(i int) { if i != 2 { t.Fatalf("should iterate 2") } diff --git a/internal/cache/lru.go b/internal/cache/lru.go index fe1da608..9d6ac7af 100644 --- a/internal/cache/lru.go +++ b/internal/cache/lru.go @@ -65,43 +65,72 @@ func (m *LRUDoubleMap[V]) Find(key1, key2 string, ts int64) (val V, ok bool) { return } +func (m *LRUDoubleMap[V]) remove(h *linked[V]) { + h.mark -= 1 + next := h.next + prev := h.prev + h.next = nil + h.prev = nil + if next != nil { + (*linked[V])(next).prev = prev + } + if prev != nil { + (*linked[V])(prev).next = next + } + if m.head == unsafe.Pointer(h) { + m.head = next + } + if m.tail == unsafe.Pointer(h) { + m.tail = prev + } + m.total -= h.size + delete(m.ma, h.key) +} + +func (m *LRUDoubleMap[V]) move(h *linked[V]) { + prev := h.prev + next := h.next + if prev != nil { + (*linked[V])(prev).next = next + } + if next != nil { + (*linked[V])(next).prev = prev + } + h.next = nil + if m.tail != nil && m.tail != unsafe.Pointer(h) { + h.prev = m.tail + (*linked[V])(m.tail).next = unsafe.Pointer(h) + } + m.tail = unsafe.Pointer(h) + if m.head == unsafe.Pointer(h) && next != nil { + m.head = next + } +} + func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) { // TODO: a RLock fast path? m.mu.Lock() + if m.ma == nil { + m.mu.Unlock() + return + } m.total += size for m.head != nil { h := (*linked[V])(m.head) if m.total <= m.limit && h.ts != 0 { // TODO: clear expired entries? break } - m.total -= h.size - delete(m.ma, h.key) - h.mark -= 1 - m.head = h.next - if m.head != nil { - (*linked[V])(m.head).prev = nil - } else { - m.tail = nil - break - } + m.remove(h) } h := m.ma[key1] if h == nil { h = &linked[V]{key: key1, ts: ts, mark: m.mark} m.ma[key1] = h - } else if h.ts <= ts { - m.total -= h.size - h.size = 0 } h.ts = ts h.size += size - h.next = nil - if m.tail != nil && m.tail != unsafe.Pointer(h) { - h.prev = m.tail - (*linked[V])(m.tail).next = unsafe.Pointer(h) - } - m.tail = unsafe.Pointer(h) + m.move(h) if m.head == nil { m.head = unsafe.Pointer(h) } @@ -118,6 +147,16 @@ func (m *LRUDoubleMap[V]) Delete(key1 string) { } func (m *LRUDoubleMap[V]) DeleteAll() { + m.mu.Lock() + m.ma = nil + m.head = nil + m.tail = nil + m.total = 0 + m.mark++ + m.mu.Unlock() +} + +func (m *LRUDoubleMap[V]) Reset() { m.mu.Lock() m.ma = make(map[string]*linked[V], len(m.ma)) m.head = nil @@ -131,25 +170,8 @@ func (m *LRUDoubleMap[V]) moveToTail(b map[*linked[V]]struct{}) { m.mu.Lock() defer m.mu.Unlock() for h := range b { - if h.mark != m.mark { - continue - } - prev := h.prev - next := h.next - if prev != nil { - (*linked[V])(prev).next = next - } - if next != nil { - (*linked[V])(next).prev = prev - } - h.next = nil - if m.tail != nil && m.tail != unsafe.Pointer(h) { - h.prev = m.tail - (*linked[V])(m.tail).next = unsafe.Pointer(h) - } - m.tail = unsafe.Pointer(h) - if m.head == unsafe.Pointer(h) && next != nil { - m.head = next + if h.mark == m.mark { + m.move(h) } } } diff --git a/internal/cache/lru_test.go b/internal/cache/lru_test.go index 83c3eae2..06581bca 100644 --- a/internal/cache/lru_test.go +++ b/internal/cache/lru_test.go @@ -177,7 +177,7 @@ func TestLRUCache_LRU_GC_2(t *testing.T) { if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { t.Fatal("not find") } - m.DeleteAll() + m.Reset() runtime.GC() runtime.GC() m.Insert("a", "a", bpsize-1, 2, 0) From 5c448079b58c2964a9d99d3c6cf0b0deb83e0144 Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 11 Feb 2025 23:09:42 -0800 Subject: [PATCH 15/21] feat: extend the CacheStore interface to have a chance to cleanup on Update Signed-off-by: Rueian --- cache.go | 8 ++++---- cache_test.go | 8 ++++---- internal/cache/lru.go | 8 ++++++-- internal/cache/lru_test.go | 26 ++++++++++++------------- lru.go | 2 +- lru_test.go | 40 +++++++++++++++++++------------------- pipe.go | 4 ++-- pipe_test.go | 4 ++-- 8 files changed, 52 insertions(+), 48 deletions(-) diff --git a/cache.go b/cache.go index 507592b1..1540e536 100644 --- a/cache.go +++ b/cache.go @@ -30,7 +30,7 @@ type CacheStore interface { // Update is called when receiving the response of the request sent by the above Flight Case 1 from redis. // It should not only update the store but also deliver the response to all CacheEntry.Wait and return a desired client side PXAT of the response. // Note that the server side expire time can be retrieved from RedisMessage.CachePXAT. - Update(key, cmd string, val RedisMessage) (pxat int64) + Update(key, cmd string, val RedisMessage, now time.Time) (pxat int64) // Cancel is called when the request sent by the above Flight Case 1 failed. // It should not only deliver the error to all CacheEntry.Wait but also remove the CacheEntry from the store. Cancel(key, cmd string, err error) @@ -91,7 +91,7 @@ func (a *adapter) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red return RedisMessage{}, flight } -func (a *adapter) Update(key, cmd string, val RedisMessage) (sxat int64) { +func (a *adapter) Update(key, cmd string, val RedisMessage, _ time.Time) (sxat int64) { a.mu.Lock() entries := a.flights[key] if flight, ok := entries[cmd].(*adapterEntry); ok { @@ -219,7 +219,7 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red return RedisMessage{}, nil } -func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { +func (f *flatten) Update(key, cmd string, val RedisMessage, now time.Time) (sxat int64) { if af, ok := f.flights.Find(key, cmd); ok { sxat = val.getExpireAt() if af.xat < sxat || sxat == 0 { @@ -227,7 +227,7 @@ func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { val.setExpireAt(sxat) } bs := val.CacheMarshal(nil) - f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, bs) + f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, now.UnixMilli(), bs) f.flights.Delete(key, cmd) af.setVal(val) } diff --git a/cache_test.go b/cache_test.go index 9010b8aa..5be6d9bf 100644 --- a/cache_test.go +++ b/cache_test.go @@ -32,7 +32,7 @@ func test(t *testing.T, storeFn func() CacheStore) { v = RedisMessage{typ: '+', string: "val"} v.setExpireAt(now.Add(time.Second).UnixMilli()) - if pttl := store.Update("key", "cmd", v); pttl < now.Add(90*time.Millisecond).UnixMilli() || pttl > now.Add(100*time.Millisecond).UnixMilli() { + if pttl := store.Update("key", "cmd", v, now); pttl < now.Add(90*time.Millisecond).UnixMilli() || pttl > now.Add(100*time.Millisecond).UnixMilli() { t.Fatal("Update should return a desired pttl") } @@ -104,8 +104,8 @@ func test(t *testing.T, storeFn func() CacheStore) { } { store.Flight("key", "cmd1", time.Millisecond*100, now) store.Flight("key", "cmd2", time.Millisecond*100, now) - store.Update("key", "cmd1", RedisMessage{typ: '+', string: "val"}) - store.Update("key", "cmd2", RedisMessage{typ: '+', string: "val"}) + store.Update("key", "cmd1", RedisMessage{typ: '+', string: "val"}, now) + store.Update("key", "cmd2", RedisMessage{typ: '+', string: "val"}, now) store.Delete(deletions) @@ -130,7 +130,7 @@ func test(t *testing.T, storeFn func() CacheStore) { v = RedisMessage{typ: '+', string: "val"} v.setExpireAt(now.Add(time.Millisecond).UnixMilli()) - store.Update("key", "cmd", v) + store.Update("key", "cmd", v, now) v, e = store.Flight("key", "cmd", time.Second, now.Add(time.Millisecond)) if v.typ != 0 || e != nil { diff --git a/internal/cache/lru.go b/internal/cache/lru.go index 9d6ac7af..90b32e80 100644 --- a/internal/cache/lru.go +++ b/internal/cache/lru.go @@ -107,7 +107,7 @@ func (m *LRUDoubleMap[V]) move(h *linked[V]) { } } -func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) { +func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts, now int64, v V) { // TODO: a RLock fast path? m.mu.Lock() if m.ma == nil { @@ -117,7 +117,7 @@ func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) { m.total += size for m.head != nil { h := (*linked[V])(m.head) - if m.total <= m.limit && h.ts != 0 { // TODO: clear expired entries? + if m.total <= m.limit && h.ts != 0 && h.ts > now { break } m.remove(h) @@ -127,6 +127,10 @@ func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) { if h == nil { h = &linked[V]{key: key1, ts: ts, mark: m.mark} m.ma[key1] = h + } else if h.ts <= now { + m.total -= h.size + h.size = 0 + h.head = chain[V]{} } h.ts = ts h.size += size diff --git a/internal/cache/lru_test.go b/internal/cache/lru_test.go index 06581bca..c12403ec 100644 --- a/internal/cache/lru_test.go +++ b/internal/cache/lru_test.go @@ -11,9 +11,9 @@ func TestLRUDoubleMap(t *testing.T) { if _, ok := m.Find("1", "a", 1); ok { t.Fatal("should not find 1") } - m.Insert("1", "a", 1, 2, 1) - m.Insert("1", "b", 1, 2, 2) - m.Insert("2", "c", 1, 2, 3) + m.Insert("1", "a", 1, 2, 1, 1) + m.Insert("1", "b", 1, 2, 1, 2) + m.Insert("2", "c", 1, 2, 1, 3) if v, ok := m.Find("1", "a", 1); !ok || v != 1 { t.Fatal("not find 1") } @@ -50,9 +50,9 @@ func TestLRUDoubleMap(t *testing.T) { t.Fatal("should have 2 heads") } - m.Insert("1", "d", 1, 2, 1) - m.Insert("1", "e", 1, 2, 2) - m.Insert("2", "f", 1, 2, 3) + m.Insert("1", "d", 1, 2, 1, 1) + m.Insert("1", "e", 1, 2, 1, 2) + m.Insert("2", "f", 1, 2, 1, 3) if v, ok := m.Find("1", "d", 1); !ok || v != 1 { t.Fatal("not find 1") } @@ -77,7 +77,7 @@ func TestLRUDoubleMap(t *testing.T) { func TestLRUCache_LRU_1(t *testing.T) { m := NewLRUDoubleMap[int](bpsize, bpsize) for i := 0; i < bpsize; i++ { - m.Insert(strconv.Itoa(i), "a", 2, 2, i) + m.Insert(strconv.Itoa(i), "a", 2, 2, 1, i) } m.mu.Lock() heads := len(m.ma) @@ -100,7 +100,7 @@ func TestLRUCache_LRU_1(t *testing.T) { func TestLRUCache_LRU_2(t *testing.T) { m := NewLRUDoubleMap[int](bpsize*2, bpsize*2) for i := 0; i < bpsize*2; i++ { - m.Insert(strconv.Itoa(i), "a", 1, 2, i) + m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i) } m.mu.Lock() heads := len(m.ma) @@ -116,7 +116,7 @@ func TestLRUCache_LRU_2(t *testing.T) { runtime.GC() runtime.GC() for i := bpsize * 2; i < bpsize*3; i++ { - m.Insert(strconv.Itoa(i), "a", 1, 2, i) + m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i) } for i := 0; i < bpsize; i++ { if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { @@ -138,14 +138,14 @@ func TestLRUCache_LRU_2(t *testing.T) { func TestLRUCache_LRU_GC(t *testing.T) { m := NewLRUDoubleMap[int](bpsize, bpsize) for i := 0; i < bpsize; i++ { - m.Insert(strconv.Itoa(i), "a", 1, 2, i) + m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i) } if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { t.Fatal("not find") } runtime.GC() runtime.GC() - m.Insert("a", "a", bpsize-1, 2, 0) + m.Insert("a", "a", bpsize-1, 2, 1, 0) m.mu.Lock() heads := len(m.ma) total := m.total @@ -172,7 +172,7 @@ func TestLRUCache_LRU_GC(t *testing.T) { func TestLRUCache_LRU_GC_2(t *testing.T) { m := NewLRUDoubleMap[int](bpsize, bpsize) for i := 0; i < bpsize; i++ { - m.Insert(strconv.Itoa(i), "a", 1, 2, i) + m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i) } if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { t.Fatal("not find") @@ -180,7 +180,7 @@ func TestLRUCache_LRU_GC_2(t *testing.T) { m.Reset() runtime.GC() runtime.GC() - m.Insert("a", "a", bpsize-1, 2, 0) + m.Insert("a", "a", bpsize-1, 2, 1, 0) m.mu.Lock() heads := len(m.ma) total := m.total diff --git a/lru.go b/lru.go index 897e6e96..180b0b98 100644 --- a/lru.go +++ b/lru.go @@ -218,7 +218,7 @@ func (c *lru) Flights(now time.Time, multi []CacheableTTL, results []RedisResult return missed[:j] } -func (c *lru) Update(key, cmd string, value RedisMessage) (pxat int64) { +func (c *lru) Update(key, cmd string, value RedisMessage, _ time.Time) (pxat int64) { var ch chan struct{} c.mu.Lock() if kc, ok := c.store[key]; ok { diff --git a/lru_test.go b/lru_test.go index a6745ed9..8e7996ff 100644 --- a/lru_test.go +++ b/lru_test.go @@ -26,7 +26,7 @@ func TestLRU(t *testing.T) { } m := RedisMessage{typ: '+', string: "0", values: []RedisMessage{{}}} m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli()) - store.Update("0", "GET", m) + store.Update("0", "GET", m, time.Now()) return store.(*lru) } @@ -49,7 +49,7 @@ func TestLRU(t *testing.T) { t.Fatalf("got unexpected value from the Flight after pttl: %v %v", v, entry) } m := RedisMessage{typ: '+', string: "1"} - lru.Update("1", "GET", m) + lru.Update("1", "GET", m, time.Now()) if v, _ := lru.Flight("1", "GET", TTL, time.Now()); v.typ == 0 { t.Fatalf("did not get the value from the second Flight") } else if v.string != "1" { @@ -98,7 +98,7 @@ func TestLRU(t *testing.T) { lru.Flight(strconv.Itoa(i), "GET", TTL, time.Now()) m := RedisMessage{typ: '+', string: strconv.Itoa(i)} m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli()) - lru.Update(strconv.Itoa(i), "GET", m) + lru.Update(strconv.Itoa(i), "GET", m, time.Now()) } if v, entry := lru.Flight("1", "GET", TTL, time.Now()); v.typ != 0 { t.Fatalf("got evicted value from the first Flight: %v %v", v, entry) @@ -123,7 +123,7 @@ func TestLRU(t *testing.T) { for i := 1; i < Entries; i++ { lru.Flight(strconv.Itoa(i), "GET", TTL, time.Now()) m := RedisMessage{typ: '+', string: strconv.Itoa(i)} - lru.Update(strconv.Itoa(i), "GET", m) + lru.Update(strconv.Itoa(i), "GET", m, time.Now()) } for i := 1; i < Entries; i++ { if v, _ := lru.Flight(strconv.Itoa(i), "GET", TTL, time.Now()); v.string != strconv.Itoa(i) { @@ -157,7 +157,7 @@ func TestLRU(t *testing.T) { m := RedisMessage{typ: '+', string: "this Update should have no effect"} m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli()) - lru.Update("1", "GET", m) + lru.Update("1", "GET", m, time.Now()) for i := 0; i < 2; i++ { // entry should be always nil after the first call if Close if v, entry := lru.Flight("1", "GET", TTL, time.Now()); v.typ != 0 || entry != nil { t.Fatalf("got unexpected value from the first Flight: %v %v", v, entry) @@ -194,7 +194,7 @@ func TestLRU(t *testing.T) { lru.Flight("key", "cmd", time.Second, time.Now()) m := RedisMessage{typ: 1} m.setExpireAt(time.Now().Add(time.Second).UnixMilli()) - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v := lru.GetTTL("key", "cmd"); !roughly(v, time.Second) { t.Fatalf("unexpected %v", v) } @@ -206,7 +206,7 @@ func TestLRU(t *testing.T) { lru.Flight("key", "cmd", 2*time.Second, time.Now()) m := RedisMessage{typ: 1} m.setExpireAt(time.Now().Add(time.Second).UnixMilli()) - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v, _ := lru.Flight("key", "cmd", 2*time.Second, time.Now()); v.CacheTTL() != 1 { t.Fatalf("unexpected %v", v.CacheTTL()) } @@ -216,7 +216,7 @@ func TestLRU(t *testing.T) { lru.Flight("key", "cmd", 2*time.Second, time.Now()) m := RedisMessage{typ: 1} m.setExpireAt(time.Now().Add(3 * time.Second).UnixMilli()) - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v, _ := lru.Flight("key", "cmd", 2*time.Second, time.Now()); v.CacheTTL() != 2 { t.Fatalf("unexpected %v", v.CacheTTL()) } @@ -225,7 +225,7 @@ func TestLRU(t *testing.T) { lru := setup(t) lru.Flight("key", "cmd", 2*time.Second, time.Now()) m := RedisMessage{typ: 1} - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v, _ := lru.Flight("key", "cmd", 2*time.Second, time.Now()); v.CacheTTL() != 2 { t.Fatalf("unexpected %v", v.CacheTTL()) } @@ -234,7 +234,7 @@ func TestLRU(t *testing.T) { lru := setup(t) lru.Flight("key", "cmd", 2*time.Second, time.Now()) m := RedisMessage{typ: 1} - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v, _ := lru.Flight("key", "cmd", 2*time.Second, time.Now()); v.CacheTTL() != 2 { t.Fatalf("unexpected %v", v.CacheTTL()) } @@ -260,7 +260,7 @@ func TestLRU(t *testing.T) { t.Fatalf("got unexpected value from the Flight after pttl: %v %v", v, entry) } m := RedisMessage{typ: '+', string: "1"} - lru.Update("1", "GET", m) + lru.Update("1", "GET", m, time.Now()) if v, _ := flights(lru, time.Now(), TTL, "GET", "1"); v.typ == 0 { t.Fatalf("did not get the value from the second Flight") } else if v.string != "1" { @@ -309,7 +309,7 @@ func TestLRU(t *testing.T) { flights(lru, time.Now(), TTL, "GET", strconv.Itoa(i)) m := RedisMessage{typ: '+', string: strconv.Itoa(i)} m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli()) - lru.Update(strconv.Itoa(i), "GET", m) + lru.Update(strconv.Itoa(i), "GET", m, time.Now()) } if v, entry := flights(lru, time.Now(), TTL, "GET", "1"); v.typ != 0 { t.Fatalf("got evicted value from the first Flight: %v %v", v, entry) @@ -334,7 +334,7 @@ func TestLRU(t *testing.T) { for i := 1; i < Entries; i++ { flights(lru, time.Now(), TTL, "GET", strconv.Itoa(i)) m := RedisMessage{typ: '+', string: strconv.Itoa(i)} - lru.Update(strconv.Itoa(i), "GET", m) + lru.Update(strconv.Itoa(i), "GET", m, time.Now()) } for i := 1; i < Entries; i++ { if v, _ := flights(lru, time.Now(), TTL, "GET", strconv.Itoa(i)); v.string != strconv.Itoa(i) { @@ -368,7 +368,7 @@ func TestLRU(t *testing.T) { m := RedisMessage{typ: '+', string: "this Update should have no effect"} m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli()) - lru.Update("1", "GET", m) + lru.Update("1", "GET", m, time.Now()) for i := 0; i < 2; i++ { // entry should be always nil after the first call if Close if v, entry := flights(lru, time.Now(), TTL, "GET", "1"); v.typ != 0 || entry != nil { t.Fatalf("got unexpected value from the first Flight: %v %v", v, entry) @@ -405,7 +405,7 @@ func TestLRU(t *testing.T) { flights(lru, time.Now(), time.Second, "cmd", "key") m := RedisMessage{typ: 1} m.setExpireAt(time.Now().Add(time.Second).UnixMilli()) - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v := lru.GetTTL("key", "cmd"); !roughly(v, time.Second) { t.Fatalf("unexpected %v", v) } @@ -417,7 +417,7 @@ func TestLRU(t *testing.T) { flights(lru, time.Now(), time.Second*2, "cmd", "key") m := RedisMessage{typ: 1} m.setExpireAt(time.Now().Add(time.Second).UnixMilli()) - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v, _ := flights(lru, time.Now(), time.Second*2, "cmd", "key"); v.CacheTTL() != 1 { t.Fatalf("unexpected %v", v.CacheTTL()) } @@ -427,7 +427,7 @@ func TestLRU(t *testing.T) { flights(lru, time.Now(), time.Second*2, "cmd", "key") m := RedisMessage{typ: 1} m.setExpireAt(time.Now().Add(3 * time.Second).UnixMilli()) - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v, _ := flights(lru, time.Now(), time.Second*2, "cmd", "key"); v.CacheTTL() != 2 { t.Fatalf("unexpected %v", v.CacheTTL()) } @@ -436,7 +436,7 @@ func TestLRU(t *testing.T) { lru := setup(t) flights(lru, time.Now(), time.Second*2, "cmd", "key") m := RedisMessage{typ: 1} - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v, _ := flights(lru, time.Now(), time.Second*2, "cmd", "key"); v.CacheTTL() != 2 { t.Fatalf("unexpected %v", v.CacheTTL()) } @@ -445,7 +445,7 @@ func TestLRU(t *testing.T) { lru := setup(t) flights(lru, time.Now(), time.Second*2, "cmd", "key") m := RedisMessage{typ: 1} - lru.Update("key", "cmd", m) + lru.Update("key", "cmd", m, time.Now()) if v, _ := flights(lru, time.Now(), time.Second*2, "cmd", "key"); v.CacheTTL() != 2 { t.Fatalf("unexpected %v", v.CacheTTL()) } @@ -483,7 +483,7 @@ func BenchmarkLRU(b *testing.B) { lru.Flight(key, "GET", TTL, time.Now()) m := RedisMessage{} m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli()) - lru.Update(key, "GET", m) + lru.Update(key, "GET", m, time.Now()) } }) } diff --git a/pipe.go b/pipe.go index adf9a5c4..ab6b0713 100644 --- a/pipe.go +++ b/pipe.go @@ -569,7 +569,7 @@ func (p *pipe) _backgroundRead() (err error) { if pttl := msg.values[i].integer; pttl >= 0 { cp.setExpireAt(now.Add(time.Duration(pttl) * time.Millisecond).UnixMilli()) } - msgs[i].setExpireAt(p.cache.Update(ck, cc, cp)) + msgs[i].setExpireAt(p.cache.Update(ck, cc, cp, now)) } } else { ck, cc := cmds.CacheKey(cacheable) @@ -579,7 +579,7 @@ func (p *pipe) _backgroundRead() (err error) { if pttl := msg.values[ci-1].integer; pttl >= 0 { cp.setExpireAt(now.Add(time.Duration(pttl) * time.Millisecond).UnixMilli()) } - msg.values[ci].setExpireAt(p.cache.Update(ck, cc, cp)) + msg.values[ci].setExpireAt(p.cache.Update(ck, cc, cp, now)) } } if prply { diff --git a/pipe_test.go b/pipe_test.go index 6e187c6e..7bd4fc2f 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -1779,7 +1779,7 @@ func TestClientSideCachingWithSideChannelMGet(t *testing.T) { time.Sleep(100 * time.Millisecond) m := RedisMessage{typ: '+', string: "OK"} m.setExpireAt(time.Now().Add(10 * time.Millisecond).UnixMilli()) - p.cache.Update("a1", "GET", m) + p.cache.Update("a1", "GET", m, time.Now()) }() v, _ := p.DoCache(context.Background(), Cacheable(cmds.NewMGetCompleted([]string{"MGET", "a1"})), 10*time.Second).AsStrSlice() @@ -2141,7 +2141,7 @@ func TestClientSideCachingWithSideChannelDoMultiCache(t *testing.T) { time.Sleep(100 * time.Millisecond) m := RedisMessage{typ: '+', string: "OK"} m.setExpireAt(time.Now().Add(10 * time.Millisecond).UnixMilli()) - p.cache.Update("a1", "GET", m) + p.cache.Update("a1", "GET", m, time.Now()) }() arr := p.DoMultiCache(context.Background(), []CacheableTTL{ From ee6bb13253b0ef4dc6cedd5226286e9df248df11 Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 12 Feb 2025 12:07:02 -0800 Subject: [PATCH 16/21] feat: delay ll and fast path on Insert Signed-off-by: Rueian --- internal/cache/double.go | 12 +++--- internal/cache/lru.go | 86 +++++++++++++++++++++++--------------- internal/cache/lru_test.go | 22 ++++++---- 3 files changed, 74 insertions(+), 46 deletions(-) diff --git a/internal/cache/double.go b/internal/cache/double.go index 719bb680..03d0d085 100644 --- a/internal/cache/double.go +++ b/internal/cache/double.go @@ -78,13 +78,16 @@ func (m *DoubleMap[V]) Delete(key1, key2 string) { if empty { e := m.bp.Get().(*empties) e.s = append(e.s, key1) - if len(e.s) >= bpsize { + if len(e.s) < bpsize { + m.bp.Put(e) + return + } + go func(m *DoubleMap[V], e *empties) { m.delete(e.s) clear(e.s) e.s = e.s[:0] - } - m.bp.Put(e) - return + m.bp.Put(e) + }(m, e) } } @@ -125,7 +128,6 @@ func NewDoubleMap[V any](hint int) *DoubleMap[V] { runtime.SetFinalizer(e, func(e *empties) { if len(e.s) >= 0 { m.delete(e.s) - clear(e.s) } }) return e diff --git a/internal/cache/lru.go b/internal/cache/lru.go index 90b32e80..ef90445c 100644 --- a/internal/cache/lru.go +++ b/internal/cache/lru.go @@ -3,6 +3,7 @@ package cache import ( "runtime" "sync" + "sync/atomic" "unsafe" ) @@ -15,8 +16,9 @@ type linked[V any] struct { prev unsafe.Pointer size int64 ts int64 - mark int64 mu sync.RWMutex + cnt uint32 + mark int32 } func (h *linked[V]) find(key string, ts int64) (v V, ok bool) { @@ -43,7 +45,7 @@ type LRUDoubleMap[V any] struct { tail unsafe.Pointer total int64 limit int64 - mark int64 + mark int32 } func (m *LRUDoubleMap[V]) Find(key1, key2 string, ts int64) (val V, ok bool) { @@ -53,14 +55,19 @@ func (m *LRUDoubleMap[V]) Find(key1, key2 string, ts int64) (val V, ok bool) { val, ok = h.find(key2, ts) } m.mu.RUnlock() - if ok { + if ok && atomic.AddUint32(&h.cnt, 1)&3 == 0 { b := m.bp.Get().(*ruBatch[V]) - b.m[h] = struct{}{} - if len(b.m) >= bpsize { - m.moveToTail(b.m) - clear(b.m) + b.s = append(b.s, h) + if len(b.s) < bpsize { + m.bp.Put(b) + return } - m.bp.Put(b) + go func(m *LRUDoubleMap[V], b *ruBatch[V]) { + m.moveToTail(b.s) + clear(b.s) + b.s = b.s[:0] + m.bp.Put(b) + }(m, b) } return } @@ -83,7 +90,7 @@ func (m *LRUDoubleMap[V]) remove(h *linked[V]) { if m.tail == unsafe.Pointer(h) { m.tail = prev } - m.total -= h.size + atomic.AddInt64(&m.total, -h.size) delete(m.ma, h.key) } @@ -108,37 +115,44 @@ func (m *LRUDoubleMap[V]) move(h *linked[V]) { } func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts, now int64, v V) { - // TODO: a RLock fast path? + m.mu.RLock() + if h := m.ma[key1]; h != nil { + atomic.AddInt64(&m.total, size) + h.mu.Lock() + if h.ts <= now { + atomic.AddInt64(&m.total, -h.size) + h.size = 0 + h.head = chain[V]{} + } + h.ts = ts + h.size += size + h.head.insert(key2, v) + h.mu.Unlock() + m.mu.RUnlock() + return + } + m.mu.RUnlock() m.mu.Lock() if m.ma == nil { m.mu.Unlock() return } - m.total += size + atomic.AddInt64(&m.total, size) for m.head != nil { h := (*linked[V])(m.head) - if m.total <= m.limit && h.ts != 0 && h.ts > now { + if h.ts != 0 && h.ts > now && atomic.LoadInt64(&m.total) <= m.limit { break } m.remove(h) } - h := m.ma[key1] - if h == nil { - h = &linked[V]{key: key1, ts: ts, mark: m.mark} - m.ma[key1] = h - } else if h.ts <= now { - m.total -= h.size - h.size = 0 - h.head = chain[V]{} - } - h.ts = ts - h.size += size + h := &linked[V]{key: key1, ts: ts, size: size, mark: m.mark} + h.head.insert(key2, v) + m.ma[key1] = h // h must not exist in the map because this Insert is called sequentially. m.move(h) if m.head == nil { m.head = unsafe.Pointer(h) } - h.head.insert(key2, v) m.mu.Unlock() } @@ -155,7 +169,7 @@ func (m *LRUDoubleMap[V]) DeleteAll() { m.ma = nil m.head = nil m.tail = nil - m.total = 0 + atomic.StoreInt64(&m.total, 0) m.mark++ m.mu.Unlock() } @@ -165,23 +179,30 @@ func (m *LRUDoubleMap[V]) Reset() { m.ma = make(map[string]*linked[V], len(m.ma)) m.head = nil m.tail = nil - m.total = 0 + atomic.StoreInt64(&m.total, 0) m.mark++ m.mu.Unlock() } -func (m *LRUDoubleMap[V]) moveToTail(b map[*linked[V]]struct{}) { +func (m *LRUDoubleMap[V]) moveToTail(s []*linked[V]) { m.mu.Lock() defer m.mu.Unlock() - for h := range b { + for _, h := range s { if h.mark == m.mark { m.move(h) } } + for m.head != nil { + h := (*linked[V])(m.head) + if atomic.LoadInt64(&m.total) <= m.limit && h.ts != 0 { + break + } + m.remove(h) + } } type ruBatch[V any] struct { - m map[*linked[V]]struct{} + s []*linked[V] } func NewLRUDoubleMap[V any](hint, limit int64) *LRUDoubleMap[V] { @@ -190,11 +211,10 @@ func NewLRUDoubleMap[V any](hint, limit int64) *LRUDoubleMap[V] { limit: limit, } m.bp.New = func() interface{} { - b := &ruBatch[V]{m: make(map[*linked[V]]struct{}, bpsize)} + b := &ruBatch[V]{s: make([]*linked[V], 0, bpsize)} runtime.SetFinalizer(b, func(b *ruBatch[V]) { - if len(b.m) > 0 { - m.moveToTail(b.m) - clear(b.m) + if len(b.s) > 0 { + m.moveToTail(b.s) } }) return b diff --git a/internal/cache/lru_test.go b/internal/cache/lru_test.go index c12403ec..fb9f5666 100644 --- a/internal/cache/lru_test.go +++ b/internal/cache/lru_test.go @@ -109,8 +109,10 @@ func TestLRUCache_LRU_2(t *testing.T) { t.Fatal("should have bpsize*2 heads", heads) } for i := 0; i < bpsize; i++ { - if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { - t.Fatal("not find") + for j := 0; j < 4; j++ { + if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { + t.Fatal("not find") + } } } runtime.GC() @@ -120,7 +122,7 @@ func TestLRUCache_LRU_2(t *testing.T) { } for i := 0; i < bpsize; i++ { if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { - t.Fatal("not find") + t.Fatal("not find", v, ok) } } for i := bpsize * 1; i < bpsize*2; i++ { @@ -130,7 +132,7 @@ func TestLRUCache_LRU_2(t *testing.T) { } for i := bpsize * 2; i < bpsize*3; i++ { if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i { - t.Fatal("not find") + t.Fatal("not find", v, ok) } } } @@ -140,8 +142,10 @@ func TestLRUCache_LRU_GC(t *testing.T) { for i := 0; i < bpsize; i++ { m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i) } - if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { - t.Fatal("not find") + for j := 0; j < 4; j++ { + if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { + t.Fatal("not find") + } } runtime.GC() runtime.GC() @@ -174,8 +178,10 @@ func TestLRUCache_LRU_GC_2(t *testing.T) { for i := 0; i < bpsize; i++ { m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i) } - if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { - t.Fatal("not find") + for j := 0; j < 4; j++ { + if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 { + t.Fatal("not find") + } } m.Reset() runtime.GC() From 212e64274c9bdbf29cf0f07f7ecd1143d1dcc00b Mon Sep 17 00:00:00 2001 From: Rueian Date: Fri, 28 Feb 2025 13:50:26 -0800 Subject: [PATCH 17/21] chore: rename to chained Signed-off-by: Rueian --- cache.go | 16 ++++++++-------- cache_test.go | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cache.go b/cache.go index 1540e536..656ae250 100644 --- a/cache.go +++ b/cache.go @@ -191,19 +191,19 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) { } } -func NewFlattenCache(limit int) CacheStore { - return &flatten{ +func NewChainedCache(limit int) CacheStore { + return &chained{ flights: cache.NewDoubleMap[*adapterEntry](64), cache: cache.NewLRUDoubleMap[[]byte](64, int64(limit)), } } -type flatten struct { +type chained struct { flights *cache.DoubleMap[*adapterEntry] cache *cache.LRUDoubleMap[[]byte] } -func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) { +func (f *chained) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) { ts := now.UnixMilli() if e, ok := f.cache.Find(key, cmd, ts); ok { var ret RedisMessage @@ -219,7 +219,7 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red return RedisMessage{}, nil } -func (f *flatten) Update(key, cmd string, val RedisMessage, now time.Time) (sxat int64) { +func (f *chained) Update(key, cmd string, val RedisMessage, now time.Time) (sxat int64) { if af, ok := f.flights.Find(key, cmd); ok { sxat = val.getExpireAt() if af.xat < sxat || sxat == 0 { @@ -234,14 +234,14 @@ func (f *flatten) Update(key, cmd string, val RedisMessage, now time.Time) (sxat return sxat } -func (f *flatten) Cancel(key, cmd string, err error) { +func (f *chained) Cancel(key, cmd string, err error) { if af, ok := f.flights.Find(key, cmd); ok { f.flights.Delete(key, cmd) af.setErr(err) } } -func (f *flatten) Delete(keys []RedisMessage) { +func (f *chained) Delete(keys []RedisMessage) { if keys == nil { f.cache.Reset() } else { @@ -251,7 +251,7 @@ func (f *flatten) Delete(keys []RedisMessage) { } } -func (f *flatten) Close(err error) { +func (f *chained) Close(err error) { f.cache.DeleteAll() f.flights.Close(func(entry *adapterEntry) { entry.setErr(err) diff --git a/cache_test.go b/cache_test.go index 5be6d9bf..acb574da 100644 --- a/cache_test.go +++ b/cache_test.go @@ -185,7 +185,7 @@ func TestCacheStore(t *testing.T) { }) t.Run("FlattenCache", func(t *testing.T) { test(t, func() CacheStore { - return NewFlattenCache(DefaultCacheBytes) + return NewChainedCache(DefaultCacheBytes) }) }) } From 67db02b974298d7ace627fec84546c173b16d3e7 Mon Sep 17 00:00:00 2001 From: Rueian Date: Fri, 28 Feb 2025 15:59:25 -0800 Subject: [PATCH 18/21] feat: clean lru map in batch Signed-off-by: Rueian --- internal/cache/lru.go | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/internal/cache/lru.go b/internal/cache/lru.go index ef90445c..f3fbb878 100644 --- a/internal/cache/lru.go +++ b/internal/cache/lru.go @@ -39,6 +39,7 @@ func (h *linked[V]) close() { type LRUDoubleMap[V any] struct { ma map[string]*linked[V] + mi []string bp sync.Pool mu sync.RWMutex head unsafe.Pointer @@ -157,6 +158,30 @@ func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts, now int64, v V) { } func (m *LRUDoubleMap[V]) Delete(key1 string) { + if m.mi == nil { + m.mi = make([]string, 0, bpsize) + } else if len(m.mi) == bpsize { + m.mu.Lock() + for _, key := range m.mi { + if h := m.ma[key]; h != nil && h.ts == 0 { + m.remove(h) + } + } + if h := m.ma[key1]; h != nil { + m.remove(h) + } + for m.head != nil { + h := (*linked[V])(m.head) + if h.ts != 0 && atomic.LoadInt64(&m.total) <= m.limit { + break + } + m.remove(h) + } + m.mu.Unlock() + clear(m.mi) + return + } + m.mi = append(m.mi, key1) m.mu.RLock() if h := m.ma[key1]; h != nil { h.close() @@ -167,6 +192,7 @@ func (m *LRUDoubleMap[V]) Delete(key1 string) { func (m *LRUDoubleMap[V]) DeleteAll() { m.mu.Lock() m.ma = nil + m.mi = nil m.head = nil m.tail = nil atomic.StoreInt64(&m.total, 0) @@ -177,6 +203,7 @@ func (m *LRUDoubleMap[V]) DeleteAll() { func (m *LRUDoubleMap[V]) Reset() { m.mu.Lock() m.ma = make(map[string]*linked[V], len(m.ma)) + m.mi = nil m.head = nil m.tail = nil atomic.StoreInt64(&m.total, 0) @@ -194,7 +221,7 @@ func (m *LRUDoubleMap[V]) moveToTail(s []*linked[V]) { } for m.head != nil { h := (*linked[V])(m.head) - if atomic.LoadInt64(&m.total) <= m.limit && h.ts != 0 { + if h.ts != 0 && atomic.LoadInt64(&m.total) <= m.limit { break } m.remove(h) From b323885cce08d1bdcfa309bdfdd6792bb5a1caa2 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sat, 1 Mar 2025 14:15:11 -0800 Subject: [PATCH 19/21] test: batch lru delete --- internal/cache/lru.go | 9 ++++++--- internal/cache/lru_test.go | 39 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/internal/cache/lru.go b/internal/cache/lru.go index f3fbb878..d9ada380 100644 --- a/internal/cache/lru.go +++ b/internal/cache/lru.go @@ -158,7 +158,7 @@ func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts, now int64, v V) { } func (m *LRUDoubleMap[V]) Delete(key1 string) { - if m.mi == nil { + if m.mi == nil { // no need to lock m.mi because this Delete is called sequentially. m.mi = make([]string, 0, bpsize) } else if len(m.mi) == bpsize { m.mu.Lock() @@ -181,12 +181,15 @@ func (m *LRUDoubleMap[V]) Delete(key1 string) { clear(m.mi) return } - m.mi = append(m.mi, key1) m.mu.RLock() - if h := m.ma[key1]; h != nil { + h := m.ma[key1] + if h != nil { h.close() } m.mu.RUnlock() + if h != nil { + m.mi = append(m.mi, key1) + } } func (m *LRUDoubleMap[V]) DeleteAll() { diff --git a/internal/cache/lru_test.go b/internal/cache/lru_test.go index fb9f5666..b35878d2 100644 --- a/internal/cache/lru_test.go +++ b/internal/cache/lru_test.go @@ -3,6 +3,7 @@ package cache import ( "runtime" "strconv" + "sync/atomic" "testing" ) @@ -74,6 +75,44 @@ func TestLRUDoubleMap(t *testing.T) { } } +func TestLRUDoubleMap_BatchDelete(t *testing.T) { + m := NewLRUDoubleMap[int](bpsize, bpsize) + m.Insert("1", "a", 1, 2, 1, 1) + m.Insert("2", "c", 1, 2, 1, 3) + m.Delete("1") + m.Delete("2") + if _, ok := m.Find("1", "a", 1); ok { + t.Fatal("should not find") + } + if _, ok := m.Find("2", "c", 1); ok { + t.Fatal("should not find") + } + m.mu.Lock() + heads := len(m.ma) + total := atomic.LoadInt64(&m.total) + m.mu.Unlock() + if heads != 2 { + t.Fatal("should have 2 heads") + } + if total == 0 { + t.Fatal("should not have 0 total") + } + for i := 0; i < bpsize; i++ { + m.Delete("1") + m.Delete("2") + } + m.mu.Lock() + heads = len(m.ma) + total = atomic.LoadInt64(&m.total) + m.mu.Unlock() + if heads != 0 { + t.Fatal("should have 0 heads") + } + if total != 0 { + t.Fatal("should have 0 total") + } +} + func TestLRUCache_LRU_1(t *testing.T) { m := NewLRUDoubleMap[int](bpsize, bpsize) for i := 0; i < bpsize; i++ { From 1603eb445b7a74fe02956bc6d7b1d5a4036d0948 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sat, 1 Mar 2025 14:37:24 -0800 Subject: [PATCH 20/21] fix: avoid race on adapterEntry Signed-off-by: Rueian --- cache.go | 10 ++++++---- internal/cache/chain.go | 12 +++++++----- internal/cache/chain_test.go | 10 +++++----- internal/cache/double.go | 5 +++-- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/cache.go b/cache.go index 656ae250..a5af67fb 100644 --- a/cache.go +++ b/cache.go @@ -228,16 +228,18 @@ func (f *chained) Update(key, cmd string, val RedisMessage, now time.Time) (sxat } bs := val.CacheMarshal(nil) f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, now.UnixMilli(), bs) - f.flights.Delete(key, cmd) - af.setVal(val) + if f.flights.Delete(key, cmd) { + af.setVal(val) + } } return sxat } func (f *chained) Cancel(key, cmd string, err error) { if af, ok := f.flights.Find(key, cmd); ok { - f.flights.Delete(key, cmd) - af.setErr(err) + if f.flights.Delete(key, cmd) { + af.setErr(err) + } } } diff --git a/internal/cache/chain.go b/internal/cache/chain.go index ce6a09c8..fafb781a 100644 --- a/internal/cache/chain.go +++ b/internal/cache/chain.go @@ -38,35 +38,37 @@ func (h *chain[V]) empty() bool { return h.node.next == nil && h.node.key == "" } -func (h *chain[V]) delete(key string) bool { +func (h *chain[V]) delete(key string) (bool, bool) { var zero V if h.node.key == key { h.node.key = "" h.node.val = zero - return h.node.next == nil + return h.node.next == nil, true } if h.node.next == nil { - return h.node.key == "" + return h.node.key == "", false } if h.node.next.key == key { h.node.next.key = "" h.node.next.val = zero h.node.next, h.node.next.next = h.node.next.next, nil - return h.empty() + return h.empty(), true } prev := h.node.next curr := h.node.next.next + deleted := false for curr != nil { if curr.key == key { curr.key = "" curr.val = zero prev.next, curr.next = curr.next, nil + deleted = true break } prev, curr = curr, curr.next } - return h.empty() + return h.empty(), deleted } diff --git a/internal/cache/chain_test.go b/internal/cache/chain_test.go index 4677d6e8..2e191f3b 100644 --- a/internal/cache/chain_test.go +++ b/internal/cache/chain_test.go @@ -12,7 +12,7 @@ func TestChain(t *testing.T) { if _, ok := h.find("any"); ok { t.Fatal("value is found") } - if empty := h.delete("any"); !empty { + if empty, deleted := h.delete("any"); !empty || deleted { t.Fatal("not empty") } h.insert("1", 1) @@ -27,7 +27,7 @@ func TestChain(t *testing.T) { if v, ok := h.find("3"); !ok || v != 3 { t.Fatal("value is not found") } - if empty := h.delete("1"); empty { + if empty, deleted := h.delete("1"); empty || !deleted { t.Fatal("empty") } if _, ok := h.find("1"); ok { @@ -39,7 +39,7 @@ func TestChain(t *testing.T) { if v, ok := h.find("3"); !ok || v != 3 { t.Fatal("value is not found") } - if empty := h.delete("2"); empty { + if empty, deleted := h.delete("2"); empty || !deleted { t.Fatal("empty") } if _, ok := h.find("2"); ok { @@ -55,10 +55,10 @@ func TestChain(t *testing.T) { if v, ok := h.find("4"); !ok || v != 4 { t.Fatal("value is not found") } - if empty := h.delete("3"); empty { + if empty, deleted := h.delete("3"); empty || !deleted { t.Fatal("empty") } - if empty := h.delete("4"); !empty { + if empty, deleted := h.delete("4"); !empty || !deleted { t.Fatal("not empty") } } diff --git a/internal/cache/double.go b/internal/cache/double.go index 03d0d085..c1f78f86 100644 --- a/internal/cache/double.go +++ b/internal/cache/double.go @@ -66,12 +66,12 @@ func (m *DoubleMap[V]) FindOrInsert(key1, key2 string, fn func() V) (val V, ok b return } -func (m *DoubleMap[V]) Delete(key1, key2 string) { +func (m *DoubleMap[V]) Delete(key1, key2 string) (deleted bool) { var empty bool m.mu.RLock() if h := m.ma[key1]; h != nil { h.mu.Lock() - empty = h.delete(key2) + empty, deleted = h.delete(key2) h.mu.Unlock() } m.mu.RUnlock() @@ -89,6 +89,7 @@ func (m *DoubleMap[V]) Delete(key1, key2 string) { m.bp.Put(e) }(m, e) } + return } func (m *DoubleMap[V]) delete(keys []string) { From 174037722fcc20b9578e710697644afe0432d30d Mon Sep 17 00:00:00 2001 From: Rueian Date: Sat, 1 Mar 2025 14:43:04 -0800 Subject: [PATCH 21/21] docs: add a comment on NewChainedCache Signed-off-by: Rueian --- cache.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cache.go b/cache.go index a5af67fb..021f879f 100644 --- a/cache.go +++ b/cache.go @@ -191,6 +191,8 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) { } } +// NewChainedCache returns a CacheStore optimized for concurrency, memory efficiency and GC, compared to +// the default client side caching CacheStore. However, it is not yet optimized for DoMultiCache. func NewChainedCache(limit int) CacheStore { return &chained{ flights: cache.NewDoubleMap[*adapterEntry](64),