Skip to content

Commit 8831fcd

Browse files
committed
Add a parquet shard cache to SG parquet mode
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent f26509f commit 8831fcd

File tree

10 files changed

+112
-30
lines changed

10 files changed

+112
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`.
88
* Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`.
99
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
10+
* [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166
1011
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
1112
* [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125
1213
* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092

docs/blocks-storage/querier.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1793,6 +1793,14 @@ blocks_storage:
17931793
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
17941794
[request_token_bucket_size: <int> | default = 4194304]
17951795

1796+
# [Experimental] Maximum size of the Parquet shard cache. 0 to disable.
1797+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-size
1798+
[parquet_shard_cache_size: <int> | default = 512]
1799+
1800+
# [Experimental] TTL of the Parquet shard cache. 0 to no TTL.
1801+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
1802+
[parquet_shard_cache_ttl: <duration> | default = 24h]
1803+
17961804
tsdb:
17971805
# Local directory to store TSDBs in the ingesters.
17981806
# CLI flag: -blocks-storage.tsdb.dir

docs/blocks-storage/store-gateway.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1871,6 +1871,14 @@ blocks_storage:
18711871
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
18721872
[request_token_bucket_size: <int> | default = 4194304]
18731873

1874+
# [Experimental] Maximum size of the Parquet shard cache. 0 to disable.
1875+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-size
1876+
[parquet_shard_cache_size: <int> | default = 512]
1877+
1878+
# [Experimental] TTL of the Parquet shard cache. 0 to no TTL.
1879+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
1880+
[parquet_shard_cache_ttl: <duration> | default = 24h]
1881+
18741882
tsdb:
18751883
# Local directory to store TSDBs in the ingesters.
18761884
# CLI flag: -blocks-storage.tsdb.dir

docs/configuration/config-file-reference.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2477,6 +2477,14 @@ bucket_store:
24772477
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
24782478
[request_token_bucket_size: <int> | default = 4194304]
24792479

2480+
# [Experimental] Maximum size of the Parquet shard cache. 0 to disable.
2481+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-size
2482+
[parquet_shard_cache_size: <int> | default = 512]
2483+
2484+
# [Experimental] TTL of the Parquet shard cache. 0 to no TTL.
2485+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
2486+
[parquet_shard_cache_ttl: <duration> | default = 24h]
2487+
24802488
tsdb:
24812489
# Local directory to store TSDBs in the ingesters.
24822490
# CLI flag: -blocks-storage.tsdb.dir

integration/querier_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,17 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
519519
if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendRedis) {
520520
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_cache_redis_requests_total"))
521521
}
522+
523+
// ensure parquet shard cache works
524+
require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_parquet_cache_hits_total"}, e2e.WithLabelMatchers(
525+
labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"),
526+
labels.MustNewMatcher(labels.MatchEqual, "name", "parquet-shards"))))
527+
require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_parquet_cache_item_count"}, e2e.WithLabelMatchers(
528+
labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"),
529+
labels.MustNewMatcher(labels.MatchEqual, "name", "parquet-shards"))))
530+
require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_parquet_cache_misses_total"}, e2e.WithLabelMatchers(
531+
labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"),
532+
labels.MustNewMatcher(labels.MatchEqual, "name", "parquet-shards"))))
522533
}
523534

524535
// Query metadata.

pkg/storage/tsdb/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cortexproject/cortex/pkg/storage/bucket"
1919
"github.com/cortexproject/cortex/pkg/util/flagext"
2020
util_log "github.com/cortexproject/cortex/pkg/util/log"
21+
"github.com/cortexproject/cortex/pkg/util/parquetutil"
2122
"github.com/cortexproject/cortex/pkg/util/users"
2223
)
2324

@@ -332,6 +333,8 @@ type BucketStoreConfig struct {
332333

333334
// Token bucket configs
334335
TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"`
336+
// Parquet shard cache config
337+
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`
335338
}
336339

337340
type TokenBucketBytesLimiterConfig struct {
@@ -393,6 +396,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
393396
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token")
394397
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token")
395398
f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
399+
cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f)
396400
}
397401

398402
// Validate the config.

pkg/storegateway/bucket_stores_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/thanos-io/thanos/pkg/store/labelpb"
3838
"github.com/thanos-io/thanos/pkg/store/storepb"
3939
"github.com/weaveworks/common/logging"
40+
"github.com/weaveworks/common/user"
4041
"go.uber.org/atomic"
4142
"google.golang.org/grpc/codes"
4243
"google.golang.org/grpc/metadata"
@@ -761,6 +762,7 @@ func querySeries(stores BucketStores, userID, metricName string, minT, maxT int6
761762
}
762763

763764
ctx := setUserIDToGRPCContext(context.Background(), userID)
765+
ctx = user.InjectOrgID(ctx, userID)
764766
srv := newBucketStoreSeriesServer(ctx)
765767
err = stores.Series(req, srv)
766768

pkg/storegateway/parquet_bucket_store.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"google.golang.org/grpc/codes"
2626
"google.golang.org/grpc/status"
2727

28+
"github.com/cortexproject/cortex/pkg/util/parquetutil"
2829
"github.com/cortexproject/cortex/pkg/util/spanlogger"
2930
"github.com/cortexproject/cortex/pkg/util/validation"
3031
)
@@ -37,10 +38,12 @@ type parquetBucketStore struct {
3738

3839
chunksDecoder *schema.PrometheusParquetChunksDecoder
3940

40-
matcherCache storecache.MatchersCache
41+
matcherCache storecache.MatchersCache
42+
parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard]
4143
}
4244

4345
func (p *parquetBucketStore) Close() error {
46+
p.parquetShardCache.Close()
4447
return p.bucket.Close()
4548
}
4649

pkg/storegateway/parquet_bucket_stores.go

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import (
3232
"github.com/cortexproject/cortex/pkg/storage/tsdb"
3333
cortex_util "github.com/cortexproject/cortex/pkg/util"
3434
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
35+
"github.com/cortexproject/cortex/pkg/util/parquetutil"
3536
"github.com/cortexproject/cortex/pkg/util/spanlogger"
37+
"github.com/cortexproject/cortex/pkg/util/users"
3638
"github.com/cortexproject/cortex/pkg/util/validation"
3739
)
3840

@@ -52,7 +54,8 @@ type ParquetBucketStores struct {
5254

5355
chunksDecoder *schema.PrometheusParquetChunksDecoder
5456

55-
matcherCache storecache.MatchersCache
57+
matcherCache storecache.MatchersCache
58+
parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard]
5659

5760
inflightRequests *cortex_util.InflightRequestTracker
5861
}
@@ -65,15 +68,21 @@ func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, bucketClient objstore.
6568
return nil, err
6669
}
6770

71+
parquetShardCache, err := parquetutil.NewParquetShardCache[parquet_storage.ParquetShard](&cfg.BucketStore.ParquetShardCache, "parquet-shards", reg)
72+
if err != nil {
73+
return nil, err
74+
}
75+
6876
u := &ParquetBucketStores{
69-
logger: logger,
70-
cfg: cfg,
71-
limits: limits,
72-
bucket: cachingBucket,
73-
stores: map[string]*parquetBucketStore{},
74-
storesErrors: map[string]error{},
75-
chunksDecoder: schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()),
76-
inflightRequests: cortex_util.NewInflightRequestTracker(),
77+
logger: logger,
78+
cfg: cfg,
79+
limits: limits,
80+
bucket: cachingBucket,
81+
stores: map[string]*parquetBucketStore{},
82+
storesErrors: map[string]error{},
83+
chunksDecoder: schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()),
84+
inflightRequests: cortex_util.NewInflightRequestTracker(),
85+
parquetShardCache: parquetShardCache,
7786
}
7887

7988
if cfg.BucketStore.MatchersCacheMaxItems > 0 {
@@ -246,12 +255,13 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger
246255
userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
247256

248257
store := &parquetBucketStore{
249-
logger: userLogger,
250-
bucket: userBucket,
251-
limits: u.limits,
252-
concurrency: 4, // TODO: make this configurable
253-
chunksDecoder: u.chunksDecoder,
254-
matcherCache: u.matcherCache,
258+
logger: userLogger,
259+
bucket: userBucket,
260+
limits: u.limits,
261+
concurrency: 4, // TODO: make this configurable
262+
chunksDecoder: u.chunksDecoder,
263+
matcherCache: u.matcherCache,
264+
parquetShardCache: u.parquetShardCache,
255265
}
256266

257267
return store, nil
@@ -265,21 +275,35 @@ type parquetBlock struct {
265275
}
266276

267277
func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) {
268-
shard, err := parquet_storage.NewParquetShardOpener(
269-
context.WithoutCancel(ctx),
270-
name,
271-
labelsFileOpener,
272-
chunksFileOpener,
273-
0,
274-
parquet_storage.WithFileOptions(
275-
parquet.SkipMagicBytes(true),
276-
parquet.ReadBufferSize(100*1024),
277-
parquet.SkipBloomFilters(true),
278-
parquet.OptimisticRead(true),
279-
),
280-
)
278+
userID, err := users.TenantID(ctx)
281279
if err != nil {
282-
return nil, errors.Wrapf(err, "failed to open parquet shard. block: %v", name)
280+
return nil, err
281+
}
282+
283+
cacheKey := fmt.Sprintf("%v-%v", userID, name)
284+
shard := p.parquetShardCache.Get(cacheKey)
285+
286+
if shard == nil {
287+
// cache miss, open parquet files
288+
shard, err = parquet_storage.NewParquetShardOpener(
289+
context.WithoutCancel(ctx),
290+
name,
291+
labelsFileOpener,
292+
chunksFileOpener,
293+
0, // we always only have 1 shard - shard 0
294+
parquet_storage.WithFileOptions(
295+
parquet.SkipMagicBytes(true),
296+
parquet.ReadBufferSize(100*1024),
297+
parquet.SkipBloomFilters(true),
298+
parquet.OptimisticRead(true),
299+
),
300+
)
301+
if err != nil {
302+
return nil, errors.Wrapf(err, "failed to open parquet shard. block: %v", name)
303+
}
304+
305+
// set shard to cache
306+
p.parquetShardCache.Set(cacheKey, shard)
283307
}
284308

285309
s, err := shard.TSDBSchema()

schemas/cortex-config-schema.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2600,6 +2600,19 @@
26002600
},
26012601
"type": "object"
26022602
},
2603+
"parquet_shard_cache_size": {
2604+
"default": 512,
2605+
"description": "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.",
2606+
"type": "number",
2607+
"x-cli-flag": "blocks-storage.bucket-store.parquet-shard-cache-size"
2608+
},
2609+
"parquet_shard_cache_ttl": {
2610+
"default": "24h0m0s",
2611+
"description": "[Experimental] TTL of the Parquet shard cache. 0 to no TTL.",
2612+
"type": "string",
2613+
"x-cli-flag": "blocks-storage.bucket-store.parquet-shard-cache-ttl",
2614+
"x-format": "duration"
2615+
},
26032616
"series_batch_size": {
26042617
"default": 10000,
26052618
"description": "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.",

0 commit comments

Comments
 (0)