Skip to content
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func runCompactForTenant(

progress.Set(compact.CleanBlocks)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
if _, err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "cleaning marked blocks")
}
compactMetrics.cleanups.Inc()
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, stubCounter, stubCounter, stubCounter)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
if _, err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"testing"
"time"

"go.uber.org/atomic"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -785,7 +787,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
c := &storetestutil.TestClient{
Name: "1",
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil)),
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil), atomic.Bool{}),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
}

Expand Down
46 changes: 31 additions & 15 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,18 +857,34 @@ func NewDeduplicateFilter(concurrency int) *DefaultDeduplicateFilter {
// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error {
f.duplicateIDs = f.duplicateIDs[:0]

var wg sync.WaitGroup
var filterWg, dupWg sync.WaitGroup
var groupChan = make(chan []*metadata.Meta)

var dupsChan = make(chan ulid.ULID)

dupWg.Add(1)
go func() {
defer dupWg.Done()
dups := make([]ulid.ULID, 0)
for dup := range dupsChan {
if metas[dup] != nil {
dups = append(dups, dup)
}
synced.WithLabelValues(duplicateMeta).Inc()
delete(metas, dup)
}
f.mu.Lock()
f.duplicateIDs = dups
f.mu.Unlock()
}()

// Start up workers to deduplicate workgroups when they're ready.
for i := 0; i < f.concurrency; i++ {
wg.Add(1)
filterWg.Add(1)
go func() {
defer wg.Done()
defer filterWg.Done()
for group := range groupChan {
f.filterGroup(group, metas, synced)
f.filterGroup(group, dupsChan)
}
}()
}
Expand All @@ -883,12 +899,15 @@ func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID
groupChan <- group
}
close(groupChan)
wg.Wait()
filterWg.Wait()

close(dupsChan)
dupWg.Wait()

return nil
}

func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) {
func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, dupsChan chan ulid.ULID) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand Down Expand Up @@ -919,19 +938,16 @@ childLoop:
coveringSet = append(coveringSet, child)
}

f.mu.Lock()
for _, duplicate := range duplicates {
if metas[duplicate] != nil {
f.duplicateIDs = append(f.duplicateIDs, duplicate)
}
synced.WithLabelValues(duplicateMeta).Inc()
delete(metas, duplicate)
dupsChan <- duplicate
}
f.mu.Unlock()
}

// DuplicateIDs returns slice of block ids that are filtered out by DefaultDeduplicateFilter.
func (f *DefaultDeduplicateFilter) DuplicateIDs() []ulid.ULID {
f.mu.Lock()
defer f.mu.Unlock()

return f.duplicateIDs
}

Expand Down
22 changes: 15 additions & 7 deletions pkg/block/indexheader/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"context"
"sync"
"time"
"unsafe"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"

xsync "golang.org/x/sync/singleflight"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -48,6 +52,7 @@ type ReaderPool struct {
// Keep track of all readers managed by the pool.
lazyReadersMx sync.Mutex
lazyReaders map[*LazyBinaryReader]struct{}
lazyReadersSF xsync.Group

lazyDownloadFunc LazyDownloadIndexHeaderFunc
}
Expand Down Expand Up @@ -122,19 +127,22 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime
// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader
// is tracked by the pool and automatically closed once the idle timeout expires.
func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) {
var reader Reader
var err error

if p.lazyReaderEnabled {
reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta))
} else {
reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader)
if !p.lazyReaderEnabled {
return NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader)
}

idBytes := id[:]
lazyReader, err, _ := p.lazyReadersSF.Do(*(*string)(unsafe.Pointer(&idBytes)), func() (interface{}, error) {
return NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta))
})

if err != nil {
level.Error(p.logger).Log("msg", "failed to create lazy index-header reader", "block", id.String(), "err", err)
return nil, err
}

reader := lazyReader.(Reader)

// Keep track of lazy readers only if required.
if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 {
p.lazyReadersMx.Lock()
Expand Down
116 changes: 116 additions & 0 deletions pkg/block/indexheader/reader_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ package indexheader

import (
"context"
"path"
"path/filepath"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/efficientgo/core/testutil"
Expand Down Expand Up @@ -132,3 +137,114 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) {
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))
}

func TestReaderPool_MultipleReaders(t *testing.T) {
ctx := context.Background()

blkDir := t.TempDir()

bkt := objstore.NewInMemBucket()
b1, err := e2eutil.CreateBlock(ctx, blkDir, []labels.Labels{
labels.New(labels.Label{Name: "a", Value: "1"}),
labels.New(labels.Label{Name: "a", Value: "2"}),
labels.New(labels.Label{Name: "a", Value: "3"}),
labels.New(labels.Label{Name: "a", Value: "4"}),
labels.New(labels.Label{Name: "b", Value: "1"}),
}, 100, 0, 1000, labels.New(labels.Label{Name: "ext1", Value: "val1"}), 124, metadata.NoneFunc)
testutil.Ok(t, err)

require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(blkDir, b1.String()), metadata.NoneFunc))

readerPool := NewReaderPool(
log.NewNopLogger(),
true,
time.Minute,
NewReaderPoolMetrics(prometheus.NewRegistry()),
AlwaysEagerDownloadIndexHeader,
)

dlDir := t.TempDir()

m, err := metadata.ReadFromDir(filepath.Join(blkDir, b1.String()))
testutil.Ok(t, err)

startWg := &sync.WaitGroup{}
startWg.Add(1)

waitWg := &sync.WaitGroup{}

const readersCount = 10
waitWg.Add(readersCount)
for range readersCount {
go func() {
defer waitWg.Done()
t.Logf("waiting")
startWg.Wait()
t.Logf("starting")

br, err := readerPool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, dlDir, b1, 32, m)
testutil.Ok(t, err)

t.Cleanup(func() {
testutil.Ok(t, br.Close())
})
}()
}

startWg.Done()
waitWg.Wait()
}

func TestReaderPool_NewBinaryReader_ErrDoesNotInsertNilReader(t *testing.T) {
ctx := context.Background()
tmpDir := t.TempDir()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, bkt.Close()) })

// Create and upload a valid block.
blockID, err := e2eutil.CreateBlock(
ctx, tmpDir,
[]labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
},
100, 0, 1000,
labels.FromStrings("ext1", "1"),
124, metadata.NoneFunc,
)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

// Now remove the index object from the bucket to force the "missing index" path.
// This simulates partial/invalid upload so NewLazyBinaryReader fails.
idxPath := path.Join(blockID.String(), block.IndexFilename)
testutil.Ok(t, bkt.Delete(ctx, idxPath))

meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String()))
testutil.Ok(t, err)

// Enable lazy readers and tracking so the pool would normally insert into lazyReaders.
metrics := NewReaderPoolMetrics(nil)
pool := NewReaderPool(log.NewNopLogger(), true, time.Minute, metrics, AlwaysEagerDownloadIndexHeader)
t.Cleanup(pool.Close)

// Call NewBinaryReader: should return error and NOT insert a nil into pool.
r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta)
if err == nil {
t.Fatalf("expected error due to missing index, got nil (reader=%v)", r)
}
if r != nil {
t.Fatalf("expected nil reader on error, got %T", r)
}

// Pool should remain empty
pool.lazyReadersMx.Lock()
got := len(pool.lazyReaders)
pool.lazyReadersMx.Unlock()
testutil.Equals(t, 0, got)

// Exercise sweeper to ensure it doesn't trip over a bad entry.
pool.closeIdleReaders()
}
Loading
Loading