From 25d5311099f7a53f8492438078b1148991f780d9 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 28 Nov 2024 18:15:20 +0100 Subject: [PATCH 01/11] cherry pick upstream PR 7945 --- go.mod | 4 ++-- go.sum | 18 ++++++++---------- pkg/receive/writecapnp/client.go | 15 +-------------- pkg/receive/writecapnp/marshal.go | 21 +++++++++++++-------- 4 files changed, 24 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index 92f2967b1f9..cd1f8d888fe 100644 --- a/go.mod +++ b/go.mod @@ -112,7 +112,7 @@ require ( ) require ( - capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af + capnproto.org/go/capnp/v3 v3.0.0-alpha.30 github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.4.0 github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 @@ -135,7 +135,6 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect github.com/cilium/ebpf v0.11.0 // indirect - github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 // indirect github.com/containerd/cgroups/v3 v3.0.3 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/go-licenser v0.3.1 // indirect @@ -169,6 +168,7 @@ require ( k8s.io/client-go v0.31.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect + zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 // indirect ) require ( diff --git a/go.sum b/go.sum index 6d8409733a8..688bdcd6574 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af h1:A5wxH0ZidOtYYUGjhtBaRuB87M73bGfc06uWB8sHpg0= -capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc= +capnproto.org/go/capnp/v3 v3.0.0-alpha.30 h1:iABQan/YiHFCgSXym5aNj27osapnEgAk4WaWYqb4sQM= +capnproto.org/go/capnp/v3 v3.0.0-alpha.30/go.mod h1:+ysMHvOh1EWNOyorxJWs1omhRFiDoKxKkWQACp54jKM= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= @@ -1506,8 +1506,6 @@ github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= -github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU= -github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0= github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0= github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0= github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -2116,8 +2114,8 @@ github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI= github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= -github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= -github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= +github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= @@ -2274,10 +2272,8 @@ github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a h1:BhWU58V github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= -github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU= -github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k= -github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= -github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= +github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= +github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= @@ -3350,3 +3346,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= +zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 h1:yksDCGMVzyn3vlyf0GZ3huiF5FFaMGQpQ3UJvR0EoGA= +zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5/go.mod h1:1LtNdPAs8WH+BTcQiZAOo2MIKD/5jyK/u7sZ9ZPe5SE= diff --git a/pkg/receive/writecapnp/client.go b/pkg/receive/writecapnp/client.go index 0a20d90d44d..3cd9f2d0820 100644 --- a/pkg/receive/writecapnp/client.go +++ b/pkg/receive/writecapnp/client.go @@ -69,22 +69,9 @@ func (r *RemoteWriteClient) writeWithReconnect(ctx context.Context, numReconnect if err := r.connect(ctx); err != nil { return nil, err } - arena := capnp.SingleSegment(nil) - defer arena.Release() result, release := r.writer.Write(ctx, func(params Writer_write_Params) error { - _, seg, err := capnp.NewMessage(arena) - if err != nil { - return err - } - wr, err := NewRootWriteRequest(seg) - if err != nil { - return err - } - if err := params.SetWr(wr); err != nil { - return err - } - wr, err = params.Wr() + wr, err := params.NewWr() if err != nil { return err } diff --git a/pkg/receive/writecapnp/marshal.go b/pkg/receive/writecapnp/marshal.go index 2d42d60b849..efc1a8ef038 100644 --- a/pkg/receive/writecapnp/marshal.go +++ b/pkg/receive/writecapnp/marshal.go @@ -6,6 +6,8 @@ package writecapnp import ( "capnproto.org/go/capnp/v3" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -46,7 +48,7 @@ func Build(tenant string, tsreq []prompb.TimeSeries) (WriteRequest, error) { func BuildInto(wr WriteRequest, tenant string, tsreq []prompb.TimeSeries) error { if err := wr.SetTenant(tenant); err != nil { - return err + return errors.Wrap(err, "set tenant") } series, err := wr.NewTimeSeries(int32(len(tsreq))) @@ -59,27 +61,30 @@ func BuildInto(wr WriteRequest, tenant string, tsreq []prompb.TimeSeries) error lblsc, err := tsc.NewLabels(int32(len(ts.Labels))) if err != nil { - return err + return errors.Wrap(err, "new labels") } if err := marshalLabels(lblsc, ts.Labels, builder); err != nil { - return err + return errors.Wrap(err, "marshal labels") } if err := marshalSamples(tsc, ts.Samples); err != nil { - return err + return errors.Wrap(err, "marshal samples") } if err := marshalHistograms(tsc, ts.Histograms); err != nil { - return err + return errors.Wrap(err, "marshal histograms") } if err := marshalExemplars(tsc, ts.Exemplars, builder); err != nil { - return err + return errors.Wrap(err, "marshal exemplars") } } symbols, err := wr.NewSymbols() if err != nil { - return err + return errors.Wrap(err, "new symbols") } - return marshalSymbols(builder, symbols) + if err := marshalSymbols(builder, symbols); err != nil { + return errors.Wrap(err, "marshal symbols") + } + return nil } func marshalSymbols(builder *symbolsBuilder, symbols Symbols) error { From bd803c7483db7e3466438390f455a2b5377c8195 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Sat, 19 Apr 2025 12:01:27 -0700 Subject: [PATCH 02/11] do not forward partial response strategy in proxy store --- pkg/store/proxy.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 47e73a3013a..367737d268d 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -347,6 +347,11 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. ShardInfo: originalRequest.ShardInfo, WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, } + if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { + // Do not forward this field as it might cause data loss. + r.PartialResponseDisabled = true + r.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT + } storeResponses := make([]respSet, 0, len(stores)) @@ -395,14 +400,14 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc() bumpCounter(st.GroupKey(), st.ReplicaKey(), failedStores) totalFailedStores++ - if r.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { + if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { if checkGroupReplicaErrors(st, err) != nil { return err } continue } level.Error(reqLogger).Log("err", err) - if !r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN { + if !originalRequest.PartialResponseDisabled || originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN { if err := srv.Send(storepb.NewWarnSeriesResponse(err)); err != nil { return err } @@ -438,7 +443,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. level.Error(s.logger).Log("msg", "Store failure with warning", "warning", warning) // Don't have group/replica keys here, so we can't attribute the warning to a specific store. s.metrics.storeFailureCount.WithLabelValues("", "").Inc() - if r.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { + if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { // The first error message is from AWS S3 and the second one is from Azure Blob Storage. if strings.Contains(resp.GetWarning(), "The specified key does not exist") || strings.Contains(resp.GetWarning(), "The specified blob does not exist") { level.Warn(s.logger).Log("msg", "Ignore 'the specified key/blob does not exist' error from Store") @@ -459,7 +464,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } firstWarning = &warning } - } else if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { + } else if originalRequest.PartialResponseDisabled || originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { return status.Error(codes.Aborted, resp.GetWarning()) } } From ce7a7f2a3bbb3ed295140204d3982bd821eca2d5 Mon Sep 17 00:00:00 2001 From: Yi Jin <96499497+jnyi@users.noreply.github.com> Date: Mon, 11 Aug 2025 12:53:40 -0700 Subject: [PATCH 03/11] [TSE-2830] cherry pick security patches (#200) Signed-off-by: Yi Jin --- go.mod | 22 +++++++++-------- go.sum | 44 ++++++++++++++++----------------- pkg/compact/overlapping.go | 43 +++++++++++++++++++++----------- pkg/compact/overlapping_test.go | 20 +++++++++++++++ 4 files changed, 82 insertions(+), 47 deletions(-) diff --git a/go.mod b/go.mod index cd1f8d888fe..9bfc098a086 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/thanos-io/thanos go 1.23.0 +toolchain go1.23.11 + require ( cloud.google.com/go/storage v1.43.0 // indirect cloud.google.com/go/trace v1.10.12 @@ -80,10 +82,10 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/automaxprocs v1.5.3 go.uber.org/goleak v1.3.0 - golang.org/x/crypto v0.32.0 - golang.org/x/net v0.34.0 - golang.org/x/sync v0.10.0 - golang.org/x/text v0.21.0 + golang.org/x/crypto v0.40.0 + golang.org/x/net v0.41.0 + golang.org/x/sync v0.16.0 + golang.org/x/text v0.27.0 golang.org/x/time v0.7.0 google.golang.org/api v0.195.0 // indirect google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c // indirect @@ -112,7 +114,7 @@ require ( ) require ( - capnproto.org/go/capnp/v3 v3.0.0-alpha.30 + capnproto.org/go/capnp/v3 v3.0.0-alpha.29 github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.4.0 github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 @@ -142,7 +144,7 @@ require ( github.com/go-openapi/runtime v0.27.1 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/godbus/dbus/v5 v5.0.4 // indirect - github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/golang-jwt/jwt/v5 v5.3.0 // indirect github.com/google/s2a-go v0.1.8 // indirect github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect github.com/jcchavezs/porto v0.1.0 // indirect @@ -275,10 +277,10 @@ require ( go.opentelemetry.io/otel/metric v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/mod v0.21.0 // indirect - golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sys v0.29.0 // indirect - golang.org/x/tools v0.24.0 // indirect + golang.org/x/mod v0.25.0 // indirect + golang.org/x/oauth2 v0.30.0 // indirect + golang.org/x/sys v0.34.0 // indirect + golang.org/x/tools v0.34.0 // indirect gonum.org/v1/gonum v0.15.0 // indirect google.golang.org/protobuf v1.35.1 howett.net/plist v0.0.0-20181124034731-591f970eefbb // indirect diff --git a/go.sum b/go.sum index 688bdcd6574..ecd0131aec7 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -capnproto.org/go/capnp/v3 v3.0.0-alpha.30 h1:iABQan/YiHFCgSXym5aNj27osapnEgAk4WaWYqb4sQM= -capnproto.org/go/capnp/v3 v3.0.0-alpha.30/go.mod h1:+ysMHvOh1EWNOyorxJWs1omhRFiDoKxKkWQACp54jKM= +capnproto.org/go/capnp/v3 v3.0.0-alpha.29 h1:Kp8kq5GVl1ANe0mxv+cl1ISEPAv45phpdMIPpB8cgN8= +capnproto.org/go/capnp/v3 v3.0.0-alpha.29/go.mod h1:+ysMHvOh1EWNOyorxJWs1omhRFiDoKxKkWQACp54jKM= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= @@ -1704,8 +1704,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/gogo/status v1.0.3/go.mod h1:SavQ51ycCLnc7dGyJxp8YAmudx8xqiVrRf+6IXRsugc= github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg= github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU= -github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= -github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= +github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= @@ -2429,8 +2429,8 @@ golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= +golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2497,8 +2497,8 @@ golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= -golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= -golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= +golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -2575,8 +2575,8 @@ golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2610,8 +2610,8 @@ golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM= golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= -golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -2633,8 +2633,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2746,8 +2746,8 @@ golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2766,8 +2766,8 @@ golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= -golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= +golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg= +golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2788,8 +2788,8 @@ golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -2871,8 +2871,8 @@ golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= -golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= -golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= +golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index acf2685e15f..6bbf3bbd83a 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -22,8 +23,13 @@ import ( const ( overlappingReason = "blocks-overlapping" + // 2 errors: add series: symbol table size exceeds. symbolTableSizeExceedsError = "symbol table size exceeds" - symbolTableSizeLimit = 512 * 1024 // lower this limits + // handled 0 errors: postings offset table length/crc32 write error: length size exceeds. + lengthSizeExceedsError = "length size exceeds" + + // only mark blocks larger than this limit for no compact. + errorBlockSeriesLimit = 512 * 1024 // lower this limits ) type OverlappingCompactionLifecycleCallback struct { @@ -130,24 +136,31 @@ func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Cont return tsdb.DefaultBlockPopulator{}, nil } -func (o OverlappingCompactionLifecycleCallback) HandleError(ctx context.Context, logger log.Logger, g *Group, toCompact []*metadata.Meta, compactErr error) int { +func (o OverlappingCompactionLifecycleCallback) markBlocksNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, toCompact []*metadata.Meta, errPattern string) int { handledErrs := 0 + for _, m := range toCompact { + if m.Stats.NumSeries < errorBlockSeriesLimit { + level.Warn(logger).Log("msg", "bypass small blocks", "block", m.String(), "series", m.Stats.NumSeries) + continue + } + handledErrs++ + if err := block.MarkForNoCompact(ctx, logger, bkt, m.ULID, metadata.NoCompactReason(errPattern), + fmt.Sprintf("failed to compact blocks: %s", m.ULID.String()), o.noCompaction); err != nil { + level.Error(logger).Log("msg", "failed to mark block for no compact", "block", m.String(), "err", err) + } + } + return handledErrs +} + +func (o OverlappingCompactionLifecycleCallback) HandleError(ctx context.Context, logger log.Logger, g *Group, toCompact []*metadata.Meta, compactErr error) int { if compactErr == nil { - return handledErrs + return 0 } level.Error(logger).Log("msg", "failed to compact blocks", "err", compactErr) if strings.Contains(compactErr.Error(), symbolTableSizeExceedsError) { - for _, m := range toCompact { - if m.Stats.NumSeries < symbolTableSizeLimit { - level.Warn(logger).Log("msg", "bypass small blocks", "block", m.String(), "series", m.Stats.NumSeries) - continue - } - handledErrs++ - if err := block.MarkForNoCompact(ctx, logger, g.bkt, m.ULID, symbolTableSizeExceedsError, - fmt.Sprintf("failed to compact blocks: %s", m.ULID.String()), o.noCompaction); err != nil { - level.Error(logger).Log("msg", "failed to mark block for no compact", "block", m.String(), "err", err) - } - } + return o.markBlocksNoCompact(ctx, logger, g.bkt, toCompact, symbolTableSizeExceedsError) + } else if strings.Contains(compactErr.Error(), lengthSizeExceedsError) { + return o.markBlocksNoCompact(ctx, logger, g.bkt, toCompact, lengthSizeExceedsError) } - return handledErrs + return 0 } diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go index 04b8cc1664c..a43f4c75496 100644 --- a/pkg/compact/overlapping_test.go +++ b/pkg/compact/overlapping_test.go @@ -181,6 +181,26 @@ func TestHandleError(t *testing.T) { handledErrs: 1, errBlockIdx: 2, }, + { + testName: "length size exceeds error - only large blocks marked", + input: []*metadata.Meta{ + createCustomBlockMeta(1, 1, 2, metadata.ReceiveSource, 1024), + createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 2*1024*1024), + }, + err: errors.New(lengthSizeExceedsError + " postings offset table"), + handledErrs: 1, + errBlockIdx: 1, + }, + { + testName: "symbol table size exceeds with small blocks bypassed", + input: []*metadata.Meta{ + createCustomBlockMeta(1, 1, 2, metadata.ReceiveSource, 1024), + createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 2*1024*1024), + }, + err: errors.New(symbolTableSizeExceedsError + " too large"), + handledErrs: 1, + errBlockIdx: 1, + }, } { t.Run(tcase.testName, func(t *testing.T) { ctx := context.Background() From 70b798170c11ea95cbee72756765572b59a5a1bf Mon Sep 17 00:00:00 2001 From: Yi Jin <96499497+jnyi@users.noreply.github.com> Date: Tue, 19 Aug 2025 16:34:20 -0700 Subject: [PATCH 04/11] pick to Release branch (#204) Signed-off-by: Yi Jin --- cmd/thanos/receive.go | 22 ++- pkg/receive/READINESS_FEATURE.md | 79 ++++++++++ pkg/receive/readiness.go | 51 +++++++ pkg/receive/readiness_integration_test.go | 173 ++++++++++++++++++++++ pkg/receive/readiness_test.go | 124 ++++++++++++++++ 5 files changed, 444 insertions(+), 5 deletions(-) create mode 100644 pkg/receive/READINESS_FEATURE.md create mode 100644 pkg/receive/readiness.go create mode 100644 pkg/receive/readiness_integration_test.go create mode 100644 pkg/receive/readiness_test.go diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index bde6319fec0..c5737e19bb4 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -58,8 +58,9 @@ import ( ) const ( - compressionNone = "none" - metricNamesFilter = "metric-names-filter" + compressionNone = "none" + metricNamesFilter = "metric-names-filter" + grpcReadinessInterceptor = "grpc-readiness-interceptor" ) func registerReceive(app *extkingpin.App) { @@ -144,11 +145,16 @@ func runReceive( level.Info(logger).Log("mode", receiveMode, "msg", "running receive") multiTSDBOptions := []receive.MultiTSDBOption{} + var enableGRPCReadinessInterceptor bool for _, feature := range *conf.featureList { if feature == metricNamesFilter { multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) level.Info(logger).Log("msg", "metric name filter feature enabled") } + if feature == grpcReadinessInterceptor { + enableGRPCReadinessInterceptor = true + level.Info(logger).Log("msg", "gRPC readiness interceptor feature enabled") + } } // Create a matcher converter if specified by command line to cache expensive regex matcher conversions. @@ -462,7 +468,7 @@ func runReceive( info.WithExemplarsInfoFunc(), ) - srv := grpcserver.New(logger, receive.NewUnRegisterer(reg), tracer, grpcLogOpts, logFilterMethods, comp, grpcProbe, + grpcOptions := []grpcserver.Option{ grpcserver.WithServer(store.RegisterStoreServer(rw, logger)), grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewMultiTSDB(dbs.TSDBExemplars))), @@ -471,7 +477,13 @@ func runReceive( grpcserver.WithGracePeriod(conf.grpcConfig.gracePeriod), grpcserver.WithMaxConnAge(conf.grpcConfig.maxConnectionAge), grpcserver.WithTLSConfig(tlsCfg), - ) + } + + if enableGRPCReadinessInterceptor { + grpcOptions = append(grpcOptions, receive.NewReadinessGRPCOptions(httpProbe)...) + } + + srv := grpcserver.New(logger, receive.NewUnRegisterer(reg), tracer, grpcLogOpts, logFilterMethods, comp, grpcProbe, grpcOptions...) g.Add( func() error { @@ -1174,7 +1186,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { Default("0").IntVar(&rc.matcherConverterCacheCapacity) cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature."). Default("0").IntVar(&rc.maxPendingGrpcWriteRequests) - rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() + rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+", "+grpcReadinessInterceptor+".").Default("").Strings() cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses) } diff --git a/pkg/receive/READINESS_FEATURE.md b/pkg/receive/READINESS_FEATURE.md new file mode 100644 index 00000000000..d6312e04b9d --- /dev/null +++ b/pkg/receive/READINESS_FEATURE.md @@ -0,0 +1,79 @@ +# gRPC Readiness Interceptor Feature + +## Overview + +The `grpc-readiness-interceptor` feature provides a gRPC interceptor that checks service readiness before processing requests. This is particularly useful when using `publishNotReadyAddresses: true` in Kubernetes services to avoid client timeouts during pod startup. + +## Problem Solved + +When using `publishNotReadyAddresses: true`: +- Pods are discoverable by clients before they're ready to handle requests +- Clients may send gRPC requests to pods that are still starting up +- Without this feature, clients experience timeouts waiting for responses + +## Solution + +When the feature is enabled: +- gRPC requests to non-ready pods get empty responses immediately (no timeouts) +- gRPC requests to ready pods process normally +- Clients can gracefully handle empty responses and retry + +## Usage + +Enable the feature using the `--enable-feature` flag: + +```bash +thanos receive \ + --enable-feature=grpc-readiness-interceptor \ + --label=replica="A" \ + # ... other flags +``` + +## How it Works + +1. The feature adds interceptors to both unary and stream gRPC calls +2. Each interceptor checks `httpProbe.IsReady()` before processing +3. If not ready: returns empty response immediately +4. If ready: processes request normally + +## Kubernetes Integration + +This feature is designed to work with: + +```yaml +apiVersion: v1 +kind: Service +metadata: + name: thanos-receive +spec: + # Allow traffic to non-ready pods + publishNotReadyAddresses: true + selector: + app: thanos-receive + ports: + - name: grpc + port: 10901 + targetPort: 10901 + - name: http + port: 10902 + targetPort: 10902 +``` + +## Testing + +The feature includes comprehensive tests: +- Unit tests for interceptor behavior +- Integration tests with mock gRPC servers +- Feature flag parsing tests + +Run tests with: +```bash +go test ./pkg/receive -run TestReadiness +``` + +## Implementation Details + +- Feature is disabled by default +- Uses existing HTTP probe for readiness state +- Minimal changes to existing codebase +- Self-contained in `pkg/receive/readiness.go` \ No newline at end of file diff --git a/pkg/receive/readiness.go b/pkg/receive/readiness.go new file mode 100644 index 00000000000..379be32c7b3 --- /dev/null +++ b/pkg/receive/readiness.go @@ -0,0 +1,51 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/thanos-io/thanos/pkg/prober" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" +) + +// ReadinessChecker is an interface for checking if the service is ready. +type ReadinessChecker interface { + IsReady() bool +} + +// NewReadinessGRPCOptions creates gRPC server options that add readiness interceptors. +// When the service is not ready, interceptors return empty responses to avoid timeouts +// during pod startup when using publishNotReadyAddresses: true. +func NewReadinessGRPCOptions(probe ReadinessChecker) []grpcserver.Option { + unaryInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + if !probe.IsReady() { + // Return empty response instead of processing the request. + // This prevents timeouts while pods are starting up when using publishNotReadyAddresses: true. + return nil, status.Errorf(codes.Unavailable, "service is not ready yet") + } + return handler(ctx, req) + } + + streamInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if !probe.IsReady() { + // Return immediately instead of processing the request. + // This prevents timeouts while pods are starting up when using publishNotReadyAddresses: true. + return nil + } + return handler(srv, ss) + } + + return []grpcserver.Option{ + grpcserver.WithGRPCServerOption(grpc.UnaryInterceptor(unaryInterceptor)), + grpcserver.WithGRPCServerOption(grpc.StreamInterceptor(streamInterceptor)), + } +} + +// Ensure that HTTPProbe implements ReadinessChecker. +var _ ReadinessChecker = (*prober.HTTPProbe)(nil) diff --git a/pkg/receive/readiness_integration_test.go b/pkg/receive/readiness_integration_test.go new file mode 100644 index 00000000000..a1b8c9d7f8a --- /dev/null +++ b/pkg/receive/readiness_integration_test.go @@ -0,0 +1,173 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/efficientgo/core/testutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/prober" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// mockWriteableStoreServer implements a simple WriteableStore service for testing. +type mockWriteableStoreServer struct { + storepb.UnimplementedWriteableStoreServer + callCount int +} + +func (m *mockWriteableStoreServer) RemoteWrite(_ context.Context, _ *storepb.WriteRequest) (*storepb.WriteResponse, error) { + m.callCount++ + return &storepb.WriteResponse{}, nil +} + +// TestReadinessFeatureIntegration tests the full integration of the readiness feature +// including feature flag parsing and gRPC server setup. +func TestReadinessFeatureIntegration(t *testing.T) { + t.Run("NewReadinessGRPCOptions creates correct options", func(t *testing.T) { + probe := prober.NewHTTP() + options := NewReadinessGRPCOptions(probe) + testutil.Equals(t, 2, len(options)) // Should have unary and stream interceptor options + }) + + t.Run("grpc server with readiness - full behavior test", func(t *testing.T) { + testReadinessWithGRPCServer(t, true) + }) + + t.Run("grpc server without readiness", func(t *testing.T) { + testReadinessWithGRPCServer(t, false) + }) +} + +func testReadinessWithGRPCServer(t *testing.T, enableReadiness bool) { + httpProbe := prober.NewHTTP() + testutil.Equals(t, false, httpProbe.IsReady()) + + mockSrv := &mockWriteableStoreServer{} + + // Test the actual NewReadinessGRPCOptions function + if enableReadiness { + readinessOptions := NewReadinessGRPCOptions(httpProbe) + testutil.Equals(t, 2, len(readinessOptions)) // Should have unary and stream interceptors + } + + // Create grpcserver with actual production setup + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + tracer := opentracing.NoopTracer{} + comp := component.Receive + grpcProbe := prober.NewGRPC() + + // Find a free port for testing + listener, err := net.Listen("tcp", "127.0.0.1:0") + testutil.Ok(t, err) + addr := listener.Addr().String() + listener.Close() + + var grpcOptions []grpcserver.Option + grpcOptions = append(grpcOptions, grpcserver.WithListen(addr)) + grpcOptions = append(grpcOptions, grpcserver.WithServer(func(s *grpc.Server) { + storepb.RegisterWriteableStoreServer(s, mockSrv) + })) + + if enableReadiness { + grpcOptions = append(grpcOptions, NewReadinessGRPCOptions(httpProbe)...) + } + + srv := grpcserver.New(logger, reg, tracer, nil, nil, comp, grpcProbe, grpcOptions...) + + // Start server in background + go func() { + if err := srv.ListenAndServe(); err != nil { + t.Errorf("Server failed: %v", err) + } + }() + defer srv.Shutdown(nil) + + // Wait for server to start + time.Sleep(200 * time.Millisecond) + + // Create client connection + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + testutil.Ok(t, err) + defer conn.Close() + + client := storepb.NewWriteableStoreClient(conn) + + // Test 1: RemoteWrite when NOT ready + ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) + defer cancel1() + + resp1, err1 := client.RemoteWrite(ctx1, &storepb.WriteRequest{}) + + if enableReadiness { + // When readiness is enabled and probe is not ready, interceptor returns Unavailable error + testutil.Assert(t, err1 != nil) + testutil.Equals(t, codes.Unavailable, status.Code(err1)) + testutil.Assert(t, resp1 == nil) + testutil.Equals(t, 0, mockSrv.callCount) // Service not called due to readiness interceptor + } else { + // When readiness is disabled, service should be called normally + testutil.Ok(t, err1) + testutil.Assert(t, resp1 != nil) + testutil.Equals(t, 1, mockSrv.callCount) + } + + // Make httpProbe ready + httpProbe.Ready() + testutil.Equals(t, true, httpProbe.IsReady()) + + // Test 2: RemoteWrite when ready + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + + resp2, err2 := client.RemoteWrite(ctx2, &storepb.WriteRequest{}) + testutil.Ok(t, err2) + testutil.Assert(t, resp2 != nil) + + if enableReadiness { + // Now that probe is ready, service should be called + testutil.Equals(t, 1, mockSrv.callCount) + } else { + // Service called again (second time) + testutil.Equals(t, 2, mockSrv.callCount) + } + + // Test 3: Make probe not ready again + httpProbe.NotReady(fmt.Errorf("test error")) + testutil.Equals(t, false, httpProbe.IsReady()) + + ctx3, cancel3 := context.WithTimeout(context.Background(), time.Second) + defer cancel3() + + resp3, err3 := client.RemoteWrite(ctx3, &storepb.WriteRequest{}) + + if enableReadiness { + // Back to not ready - should return Unavailable error and not call service + testutil.Assert(t, err3 != nil) + testutil.Equals(t, codes.Unavailable, status.Code(err3)) + testutil.Assert(t, resp3 == nil) + testutil.Equals(t, 1, mockSrv.callCount) // Count should not increase + } else { + // Service called again (third time) + testutil.Ok(t, err3) + testutil.Assert(t, resp3 != nil) + testutil.Equals(t, 3, mockSrv.callCount) + } +} diff --git a/pkg/receive/readiness_test.go b/pkg/receive/readiness_test.go new file mode 100644 index 00000000000..5b7fe69fd98 --- /dev/null +++ b/pkg/receive/readiness_test.go @@ -0,0 +1,124 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "context" + "net" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/prober" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type mockReadinessChecker struct { + ready bool +} + +func (m *mockReadinessChecker) IsReady() bool { + return m.ready +} + +func (m *mockReadinessChecker) SetReady(ready bool) { + m.ready = ready +} + +func TestNewReadinessGRPCOptions(t *testing.T) { + readyChecker := &mockReadinessChecker{ready: true} + options := NewReadinessGRPCOptions(readyChecker) + testutil.Equals(t, 2, len(options)) +} + +func TestReadinessInterceptors(t *testing.T) { + checker := &mockReadinessChecker{ready: false} + + // Test the actual NewReadinessGRPCOptions function + readinessOptions := NewReadinessGRPCOptions(checker) + testutil.Equals(t, 2, len(readinessOptions)) // Should have unary and stream interceptors + + // Create grpcserver with actual readiness options (tests both unary and stream interceptors) + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + comp := component.Receive + grpcProbe := prober.NewGRPC() + + // Find a free port for testing + listener, err := net.Listen("tcp", "127.0.0.1:0") + testutil.Ok(t, err) + addr := listener.Addr().String() + listener.Close() + + mockSrv := &mockWriteableStoreServer{} + var grpcOptions []grpcserver.Option + grpcOptions = append(grpcOptions, grpcserver.WithListen(addr)) + grpcOptions = append(grpcOptions, readinessOptions...) // Use actual readiness options (both unary and stream) + grpcOptions = append(grpcOptions, grpcserver.WithServer(func(s *grpc.Server) { + storepb.RegisterWriteableStoreServer(s, mockSrv) + })) + + srv := grpcserver.New(logger, reg, opentracing.NoopTracer{}, nil, nil, comp, grpcProbe, grpcOptions...) + + // Start server + go func() { + if err := srv.ListenAndServe(); err != nil { + t.Errorf("Server failed: %v", err) + } + }() + defer srv.Shutdown(nil) + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + testutil.Ok(t, err) + defer conn.Close() + + client := storepb.NewWriteableStoreClient(conn) + + // Test when not ready - this tests the unary interceptor (RemoteWrite is unary) + // Stream interceptors are also applied but RemoteWrite doesn't use streaming + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + resp, err := client.RemoteWrite(ctx, &storepb.WriteRequest{}) + testutil.Assert(t, err != nil) + testutil.Equals(t, codes.Unavailable, status.Code(err)) + testutil.Assert(t, resp == nil) + testutil.Equals(t, 0, mockSrv.callCount) + + // Test when ready + checker.SetReady(true) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + + resp2, err2 := client.RemoteWrite(ctx2, &storepb.WriteRequest{}) + testutil.Ok(t, err2) + testutil.Assert(t, resp2 != nil) + testutil.Equals(t, 1, mockSrv.callCount) + + // Test not ready again + checker.SetReady(false) + + ctx3, cancel3 := context.WithTimeout(context.Background(), time.Second) + defer cancel3() + + resp3, err3 := client.RemoteWrite(ctx3, &storepb.WriteRequest{}) + testutil.Assert(t, err3 != nil) + testutil.Equals(t, codes.Unavailable, status.Code(err3)) + testutil.Assert(t, resp3 == nil) + testutil.Equals(t, 1, mockSrv.callCount) // Should not increment +} From 8801feb02ed9cf05928da1b42d9c659014741d0b Mon Sep 17 00:00:00 2001 From: Yi Jin <96499497+jnyi@users.noreply.github.com> Date: Tue, 26 Aug 2025 09:34:23 -0700 Subject: [PATCH 05/11] Cherry pick for Release (#211) Signed-off-by: Yi Jin Co-authored-by: HC Zhu (Databricks) --- cmd/thanos/receive.go | 11 +- pkg/compact/retention.go | 14 +- pkg/compact/retention_test.go | 14 +- pkg/query/endpointset.go | 30 +++++ pkg/query/endpointset_test.go | 47 +++++++ pkg/queryfrontend/roundtrip_test.go | 1 + pkg/receive/multitsdb.go | 40 +++++- pkg/receive/multitsdb_test.go | 195 ++++++++++++++++++++++++++++ 8 files changed, 336 insertions(+), 16 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c5737e19bb4..abb1c5cb691 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -157,6 +157,11 @@ func runReceive( } } + if len(*conf.noUploadTenants) > 0 { + multiTSDBOptions = append(multiTSDBOptions, receive.WithNoUploadTenants(*conf.noUploadTenants)) + level.Info(logger).Log("msg", "configured tenants for local storage only", "tenants", strings.Join(*conf.noUploadTenants, ",")) + } + // Create a matcher converter if specified by command line to cache expensive regex matcher conversions. // Proxy store and TSDB stores of all tenants share a single cache. var matcherConverter *storepb.MatcherConverter @@ -1019,7 +1024,8 @@ type receiveConfig struct { maxPendingGrpcWriteRequests int lazyRetrievalMaxBufferedResponses int - featureList *[]string + featureList *[]string + noUploadTenants *[]string } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -1186,7 +1192,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { Default("0").IntVar(&rc.matcherConverterCacheCapacity) cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature."). Default("0").IntVar(&rc.maxPendingGrpcWriteRequests) - rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+", "+grpcReadinessInterceptor+".").Default("").Strings() + rc.featureList = cmd.Flag("enable-feature", "Experimental feature names to enable. The current list of features is "+metricNamesFilter+", "+grpcReadinessInterceptor+". Repeat this flag to enable multiple features.").Strings() + rc.noUploadTenants = cmd.Flag("receive.no-upload-tenants", "Tenant IDs/patterns that should only store data locally (no object store upload). Supports exact matches (e.g., 'tenant1') and prefix patterns (e.g., 'prod-*'). Repeat this flag to specify multiple patterns.").Strings() cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses) } diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 7ad43673fa3..668cd5b9138 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -25,8 +25,9 @@ import ( const ( // tenantRetentionRegex is the regex pattern for parsing tenant retention. - // valid format is `:(|d)(:lvl1)?` where > 0. - tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))(:lvl1)?$` + // valid format is `:(|d)(:all)?` where > 0. + // Default behavior is to delete only level 1 blocks, use :all to delete all blocks. + tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))(:all)?$` Level1 = 1 // compaction level 1 indicating a new block Level2 = 2 // compaction level 2 indicating a compacted block @@ -73,7 +74,7 @@ func ApplyRetentionPolicyByResolution( type RetentionPolicy struct { CutoffDate time.Time RetentionDuration time.Duration - Level1 bool // Lvl1 indicates if the retention policy is only for level 1 blocks. + IsAll bool // IsAll indicates if the retention policy applies to all blocks. Default is false (level 1 only). } func (r RetentionPolicy) isExpired(blockMaxTime time.Time) bool { @@ -88,7 +89,7 @@ func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) retentionByTenant := make(map[string]RetentionPolicy, len(retentionTenants)) for _, tenantRetention := range retentionTenants { matches := pattern.FindStringSubmatch(tenantRetention) - invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `:(|d)`", tenantRetention) + invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `:(|d)(:all)?`", tenantRetention) if matches == nil { return nil, errors.Wrapf(invalidFormat, "matched size %d", len(matches)) } @@ -111,7 +112,7 @@ func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) } policy.RetentionDuration = time.Duration(duration) } - policy.Level1 = len(matches) > 5 && matches[5] == ":lvl1" + policy.IsAll = len(matches) > 5 && matches[5] == ":all" level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", fmt.Sprintf("%v", policy)) retentionByTenant[tenant] = policy } @@ -139,7 +140,8 @@ func ApplyRetentionPolicyByTenant( continue } maxTime := time.Unix(m.MaxTime/1000, 0) - if policy.Level1 && m.Compaction.Level != Level1 { + // Default behavior: only delete level 1 blocks unless IsAll is true + if !policy.IsAll && m.Compaction.Level != Level1 { continue } if policy.isExpired(maxTime) { diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 8af30f98b41..0e43da89e72 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -324,29 +324,29 @@ func TestParseRetentionPolicyByTenant(t *testing.T) { }, { "valid", - []string{"tenant-1:2021-01-01", "tenant-2:11d", "tenant-3:2024-10-17:lvl1"}, + []string{"tenant-1:2021-01-01", "tenant-2:11d", "tenant-3:2024-10-17:all"}, map[string]compact.RetentionPolicy{ "tenant-1": { CutoffDate: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), RetentionDuration: time.Duration(0), - Level1: false, + IsAll: false, }, "tenant-2": { CutoffDate: time.Time{}, RetentionDuration: 11 * 24 * time.Hour, - Level1: false, + IsAll: false, }, "tenant-3": { CutoffDate: time.Date(2024, 10, 17, 0, 0, 0, 0, time.UTC), RetentionDuration: time.Duration(0), - Level1: true, + IsAll: true, }, }, false, }, { "invalid string", - []string{"ewrwerwerw:werqj:Werw", "tenant#2:1:lvl1"}, + []string{"ewrwerwerw:werqj:Werw", "tenant#2:1:all"}, nil, true, }, @@ -529,7 +529,7 @@ func TestApplyRetentionPolicyByTenant(t *testing.T) { false, }, { - "tenant retention with duration and lvl1 only", + "tenant retention with duration and level 1 only (default)", []testBlock{ { "01CPHBEX20729MJQZXE3W0BW48", @@ -564,7 +564,7 @@ func TestApplyRetentionPolicyByTenant(t *testing.T) { "tenant": { CutoffDate: time.Time{}, RetentionDuration: 10 * time.Hour, - Level1: true, + IsAll: false, // Default behavior: only level 1 blocks }, }, []string{ diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index f498db27d04..3fafd53a8af 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -19,7 +19,9 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -853,8 +855,36 @@ func (er *endpointRef) TSDBInfos() []infopb.TSDBInfo { return er.metadata.Store.TsdbInfos } +// Return timestamps in milliseconds. func (er *endpointRef) timeRange() (int64, int64) { + timeSub := func(sub model.Duration) int64 { + return timestamp.FromTime(time.Now().Add(-time.Duration(sub))) + } + if er.metadata == nil || er.metadata.Store == nil { + // This is to fix a corner case manifested as the following event sequence: + /* + 1. A long range store pod becomes ready and visible to the range querier. + 2. The range querier creates an endpoint for the long range store pod. + 3. The long range store pod quickly starts to OOM. + 4. The range querier tries to get the long range store pod’s meta info through info() gRPC call to get it’s time range. + The gRPC calls keep failing. + 5. The range querier uses the default time range [min-int64, max-int64] for the long range store pod to match any in-coming query. + This way, the long range store pod is incorrectly included in the fan-out endpoints. + */ + // TODO: replace this hacky fix with a better one. + var longRangeMaxSub, longRangeMinSub, shortRangeMaxSub, shortRangeMinSub model.Duration + _ = longRangeMaxSub.Set("9600h") + _ = longRangeMinSub.Set("11490m") + _ = shortRangeMaxSub.Set("192h") + _ = shortRangeMinSub.Set("1410m") + + switch er.groupKey { + case "store-grpc-group-svc-pantheon-long-range-store": + return timeSub(longRangeMaxSub), timeSub(longRangeMinSub) + case "store-grpc-group-svc-pantheon-store": + return timeSub(shortRangeMaxSub), timeSub(shortRangeMinSub) + } return math.MinInt64, math.MaxInt64 } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 3aac3d94d64..c2f57e03d84 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -1652,3 +1652,50 @@ func TestDeadlockLocking(t *testing.T) { testutil.Ok(t, g.Wait()) } + +func TestDefaultTimeRange(t *testing.T) { + t.Parallel() + + { + endpointRef := &endpointRef{ + groupKey: "store-grpc-group-svc-pantheon-long-range-store", + } + minTime, maxTime := endpointRef.timeRange() + + testutil.Assert(t, minTime != math.MinInt64, "minTime should not be math.MinInt64") + testutil.Assert(t, maxTime != math.MaxInt64, "maxTime should not be math.MaxInt64") + + now := time.Now() + testutil.Equals(t, now.Add(-9600*time.Hour).Unix()/60, minTime/(1000*60)) + testutil.Equals(t, now.Add(-11490*time.Minute).Unix()/60, maxTime/(1000*60)) + } + { + endpointRef := &endpointRef{ + groupKey: "store-grpc-group-svc-pantheon-store", + } + minTime, maxTime := endpointRef.timeRange() + + testutil.Assert(t, minTime != math.MinInt64, "minTime should not be math.MinInt64") + testutil.Assert(t, maxTime != math.MaxInt64, "maxTime should not be math.MaxInt64") + + now := time.Now() + testutil.Equals(t, now.Add(-192*time.Hour).Unix()/60, minTime/(1000*60)) + testutil.Equals(t, now.Add(-1410*time.Minute).Unix()/60, maxTime/(1000*60)) + } + { + endpointRef := &endpointRef{ + groupKey: "store-grpc-group-svc-pantheon-db", + } + minTime, maxTime := endpointRef.timeRange() + + testutil.Equals(t, int64(math.MinInt64), minTime) + testutil.Equals(t, int64(math.MaxInt64), maxTime) + } + { + endpointRef := &endpointRef{} + minTime, maxTime := endpointRef.timeRange() + + testutil.Equals(t, int64(math.MinInt64), minTime) + testutil.Equals(t, int64(math.MaxInt64), maxTime) + } +} diff --git a/pkg/queryfrontend/roundtrip_test.go b/pkg/queryfrontend/roundtrip_test.go index 457f9227173..01f35b6510a 100644 --- a/pkg/queryfrontend/roundtrip_test.go +++ b/pkg/queryfrontend/roundtrip_test.go @@ -532,6 +532,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) { } func TestRoundTripQueryCacheWithShardingMiddleware(t *testing.T) { + t.Skip("Flaky test - skipping until race condition is fixed") testRequest := &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 0, diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 68be1c96c19..eb80ff5fefa 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -67,6 +67,7 @@ type MultiTSDB struct { metricNameFilterEnabled bool matcherConverter *storepb.MatcherConverter + noUploadTenants []string // Support both exact matches and prefix patterns (e.g., "tenant1", "prod-*") } // MultiTSDBOption is a functional option for MultiTSDB. @@ -86,6 +87,14 @@ func WithMatcherConverter(mc *storepb.MatcherConverter) MultiTSDBOption { } } +// WithNoUploadTenants sets the list of tenant IDs/patterns that should not upload to object store (local storage only). +// Supports exact matches (e.g., "tenant1") and prefix patterns (e.g., "prod-*" matches "prod-tenant1", "prod-tenant2"). +func WithNoUploadTenants(tenants []string) MultiTSDBOption { + return func(s *MultiTSDB) { + s.noUploadTenants = tenants + } +} + // NewMultiTSDB creates new MultiTSDB. // NOTE: Passed labels must be sorted lexicographically (alphabetically). func NewMultiTSDB( @@ -127,6 +136,29 @@ func NewMultiTSDB( return mt } +// isNoUploadTenant checks if a tenant matches any of the no-upload patterns. +// Supports exact matches and prefix patterns (ending with '*'). +func (t *MultiTSDB) isNoUploadTenant(tenantID string) bool { + if t.noUploadTenants == nil { + return false + } + + for _, pattern := range t.noUploadTenants { + if len(pattern) > 0 && pattern[len(pattern)-1] == '*' { + // Prefix match: compare tenant ID with pattern prefix (excluding '*') + if len(tenantID) >= len(pattern)-1 && tenantID[:len(pattern)-1] == pattern[:len(pattern)-1] { + return true + } + } else { + // Exact match + if pattern == tenantID { + return true + } + } + } + return false +} + func (t *MultiTSDB) GetTenants() []string { t.mtx.RLock() defer t.mtx.RUnlock() @@ -595,6 +627,12 @@ func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { ) for tenantID, tenant := range t.tenants { + // Skip upload for tenants configured for local storage only + if t.isNoUploadTenant(tenantID) { + level.Debug(t.logger).Log("msg", "skipping upload for local-only tenant", "tenant", tenantID) + continue + } + level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenantID) s := tenant.shipper() if s == nil { @@ -732,7 +770,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant return err } var ship *shipper.Shipper - if t.bucket != nil { + if t.bucket != nil && !t.isNoUploadTenant(tenantID) { ship = shipper.New( logger, reg, diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index a36db4b402f..f862dda5c75 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -963,3 +963,198 @@ func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) { }, tenant.blocksToDelete(nil)) }) } + +func TestMultiTSDBBlockedTenantUploads(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + bucket := objstore.NewInMemBucket() + + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + bucket, + false, + metadata.NoneFunc, + WithNoUploadTenants([]string{"no-upload-tenant", "blocked-*"}), + ) + defer func() { testutil.Ok(t, m.Close()) }() + + testutil.Ok(t, appendSample(m, "allowed-tenant", time.Now())) + testutil.Ok(t, appendSample(m, "no-upload-tenant", time.Now())) + testutil.Ok(t, appendSample(m, "blocked-prefix-tenant", time.Now())) + testutil.Ok(t, appendSample(m, "another-allowed-tenant", time.Now())) + + testutil.Ok(t, m.Flush()) + + var objectsBeforeSync int + testutil.Ok(t, bucket.Iter(context.Background(), "", func(s string) error { + objectsBeforeSync++ + return nil + })) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + uploaded, err := m.Sync(ctx) + testutil.Ok(t, err) + + // Should have uploaded blocks from 2 allowed tenants (not the 2 no-upload ones) + testutil.Equals(t, 2, uploaded) + + // Count objects after sync - should only see uploads from allowed tenants + var objectsAfterSync []string + testutil.Ok(t, bucket.Iter(context.Background(), "", func(s string) error { + objectsAfterSync = append(objectsAfterSync, s) + return nil + })) + + // Since object names don't contain tenant info, we verify behavior by: + // 1. Checking upload count (should be 2, not 3) + // 2. Verifying that all tenants exist locally but only allowed ones uploaded + + // Verify all tenants exist locally (blocks should be on disk for all) + noUploadTenantBlocks := 0 + allowedTenantBlocks := 0 + anotherAllowedTenantBlocks := 0 + + // Count blocks in local filesystem for each tenant + if files, err := os.ReadDir(path.Join(dir, "no-upload-tenant")); err == nil { + for _, f := range files { + if f.IsDir() && f.Name() != "wal" && f.Name() != "chunks_head" { + noUploadTenantBlocks++ + } + } + } + + blockedPrefixTenantBlocks := 0 + if files, err := os.ReadDir(path.Join(dir, "blocked-prefix-tenant")); err == nil { + for _, f := range files { + if f.IsDir() && f.Name() != "wal" && f.Name() != "chunks_head" { + blockedPrefixTenantBlocks++ + } + } + } + if files, err := os.ReadDir(path.Join(dir, "allowed-tenant")); err == nil { + for _, f := range files { + if f.IsDir() && f.Name() != "wal" && f.Name() != "chunks_head" { + allowedTenantBlocks++ + } + } + } + if files, err := os.ReadDir(path.Join(dir, "another-allowed-tenant")); err == nil { + for _, f := range files { + if f.IsDir() && f.Name() != "wal" && f.Name() != "chunks_head" { + anotherAllowedTenantBlocks++ + } + } + } + + // All tenants should have blocks locally (including no-upload ones) + testutil.Assert(t, noUploadTenantBlocks > 0, "no upload tenant should have blocks locally") + testutil.Assert(t, blockedPrefixTenantBlocks > 0, "blocked prefix tenant should have blocks locally") + testutil.Assert(t, allowedTenantBlocks > 0, "allowed tenant should have blocks locally") + testutil.Assert(t, anotherAllowedTenantBlocks > 0, "another allowed tenant should have blocks locally") + + // But only 2 uploads should have happened (not 4) - exact match and prefix match should both be blocked + testutil.Equals(t, 2, len(objectsAfterSync)) +} + +func TestMultiTSDBNoUploadTenantsPrefix(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + bucket := objstore.NewInMemBucket() + + // Test prefix matching functionality + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + bucket, + false, + metadata.NoneFunc, + WithNoUploadTenants([]string{"prod-*", "staging-*", "exact-tenant"}), + ) + defer func() { testutil.Ok(t, m.Close()) }() + + // Test various tenant patterns + testutil.Ok(t, appendSample(m, "prod-tenant1", time.Now())) // Should match prod-* + testutil.Ok(t, appendSample(m, "prod-tenant2", time.Now())) // Should match prod-* + testutil.Ok(t, appendSample(m, "staging-app", time.Now())) // Should match staging-* + testutil.Ok(t, appendSample(m, "exact-tenant", time.Now())) // Should match exact + testutil.Ok(t, appendSample(m, "dev-tenant", time.Now())) // Should NOT match + testutil.Ok(t, appendSample(m, "production", time.Now())) // Should NOT match (no * suffix) + + testutil.Ok(t, m.Flush()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + uploaded, err := m.Sync(ctx) + testutil.Ok(t, err) + + // Should have uploaded blocks from only 2 tenants (dev-tenant and production) + testutil.Equals(t, 2, uploaded) + + // Test the prefix matching function directly + testutil.Assert(t, m.isNoUploadTenant("prod-tenant1"), "prod-tenant1 should match prod-*") + testutil.Assert(t, m.isNoUploadTenant("prod-anything"), "prod-anything should match prod-*") + testutil.Assert(t, m.isNoUploadTenant("staging-app"), "staging-app should match staging-*") + testutil.Assert(t, m.isNoUploadTenant("exact-tenant"), "exact-tenant should match exactly") + testutil.Assert(t, !m.isNoUploadTenant("dev-tenant"), "dev-tenant should NOT match any pattern") + testutil.Assert(t, !m.isNoUploadTenant("production"), "production should NOT match prod-* (no * suffix)") + testutil.Assert(t, !m.isNoUploadTenant("random"), "random should NOT match any pattern") +} + +func TestNoUploadTenantsRetentionStillWorks(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + bucket := objstore.NewInMemBucket() + + // Create MultiTSDB with no-upload tenant + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + bucket, + false, + metadata.NoneFunc, + WithNoUploadTenants([]string{"no-upload-tenant"}), + ) + defer func() { testutil.Ok(t, m.Close()) }() + + // Add sample to no-upload tenant + testutil.Ok(t, appendSample(m, "no-upload-tenant", time.Now())) + testutil.Ok(t, m.Flush()) + + // Verify tenant exists locally + tenantDir := path.Join(dir, "no-upload-tenant") + _, err := os.Stat(tenantDir) + testutil.Ok(t, err) // Should not error, directory should exist + + // Verify tenant has no shipper (key part of the fix) + m.mtx.RLock() + defer m.mtx.RUnlock() + tenant, exists := m.tenants["no-upload-tenant"] + testutil.Assert(t, exists, "no-upload tenant should exist") + shipper := tenant.shipper() + testutil.Assert(t, shipper == nil, "no-upload tenant should have no shipper") + + // Test that retention cleanup would still work (by calling pruneTSDB directly) + // Note: We can't easily test the full retention flow in a unit test due to timing, + // but we've verified the key fix: no-upload tenants don't get a shipper, + // so the pruning logic won't try to upload during retention cleanup. +} From edec655f0f28f7c51ee4896910fe6ddc05f74c19 Mon Sep 17 00:00:00 2001 From: divyansh-chhabria-db Date: Mon, 8 Sep 2025 10:01:21 +0530 Subject: [PATCH 06/11] Cherry-pick for release (#216) --- go.mod | 1 + go.sum | 2 + pkg/queryfrontend/query_logger.go | 200 ++++++++++++++++++++++ pkg/queryfrontend/queryinstant_logger.go | 206 ++++++++++++++++++++++ pkg/queryfrontend/queryrange_logger.go | 208 +++++++++++++++++++++++ pkg/queryfrontend/roundtrip.go | 16 ++ 6 files changed, 633 insertions(+) create mode 100644 pkg/queryfrontend/query_logger.go create mode 100644 pkg/queryfrontend/queryinstant_logger.go create mode 100644 pkg/queryfrontend/queryrange_logger.go diff --git a/go.mod b/go.mod index 9bfc098a086..4fdc78be39a 100644 --- a/go.mod +++ b/go.mod @@ -127,6 +127,7 @@ require ( go.opentelemetry.io/contrib/propagators/autoprop v0.54.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) require github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect diff --git a/go.sum b/go.sum index ecd0131aec7..70c875ec93b 100644 --- a/go.sum +++ b/go.sum @@ -3238,6 +3238,8 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/queryfrontend/query_logger.go b/pkg/queryfrontend/query_logger.go new file mode 100644 index 00000000000..c97c9c3b990 --- /dev/null +++ b/pkg/queryfrontend/query_logger.go @@ -0,0 +1,200 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "encoding/json" + "io" + "strings" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/internal/cortex/querier/queryrange" +) + +// StoreMatcherSet represents a set of label matchers for store filtering. +type StoreMatcherSet struct { + Matchers []LabelMatcher `json:"matchers"` +} + +// LabelMatcher represents a single label matcher. +type LabelMatcher struct { + Name string `json:"name"` + Value string `json:"value"` + Type string `json:"type"` // EQ, NEQ, RE, NRE +} + +// UserInfo holds user identification information extracted from request headers. +type UserInfo struct { + Source string + GrafanaDashboardUid string + GrafanaPanelId string + RequestId string + Tenant string + ForwardedFor string + UserAgent string +} + +// ResponseStats holds statistics extracted from query response. +type ResponseStats struct { + BytesFetched int64 + TimeseriesFetched int64 + Chunks int64 + Samples int64 +} + +// QueryLogConfig holds configuration for query logging. +type QueryLogConfig struct { + LogDir string // Directory to store log files. + MaxSizeMB int // Maximum size in megabytes before rotation. + MaxAge int // Maximum number of days to retain old log files. + MaxBackups int // Maximum number of old log files to retain. + Compress bool // Whether to compress rotated files. +} + +// ExtractUserInfoFromHeaders extracts user info from request headers (works for both range and instant queries). +func ExtractUserInfoFromHeaders(headers []*RequestHeader) UserInfo { + userInfo := UserInfo{} + + for _, header := range headers { + headerName := strings.ToLower(header.Name) + if len(header.Values) == 0 { + continue + } + headerValue := header.Values[0] + + switch headerName { + case "user-agent": + userInfo.UserAgent = headerValue + // Determine source from User-Agent if not already set. + if userInfo.Source == "" { + userAgentLower := strings.ToLower(headerValue) + if strings.Contains(userAgentLower, "grafana") { + userInfo.Source = "Grafana" + } + } + case "x-dashboard-uid": + userInfo.GrafanaDashboardUid = headerValue + case "x-panel-id": + userInfo.GrafanaPanelId = headerValue + case "x-request-id": + userInfo.RequestId = headerValue + case "thanos-tenant": + userInfo.Tenant = headerValue + case "x-forwarded-for": + userInfo.ForwardedFor = headerValue + case "x-source": + // X-Source header as fallback for source. + if userInfo.Source == "" { + userInfo.Source = headerValue + } + } + } + + // Set default source if still empty. + if userInfo.Source == "" { + userInfo.Source = "unknown" + } + + return userInfo +} + +// ExtractEmailFromResponse extracts the email from response headers (works for both range and instant queries). +func ExtractEmailFromResponse(resp queryrange.Response) string { + if resp == nil { + return "" + } + + // Check both response types using OR condition + var headers []*queryrange.PrometheusResponseHeader + if promResp, ok := resp.(*queryrange.PrometheusResponse); ok { + headers = promResp.GetHeaders() + } else if promResp, ok := resp.(*queryrange.PrometheusInstantQueryResponse); ok { + headers = promResp.GetHeaders() + } + + for _, header := range headers { + if strings.ToLower(header.Name) == "x-auth-request-email" && len(header.Values) > 0 { + return header.Values[0] + } + } + + return "" +} + +// ConvertStoreMatchers converts internal store matchers to logging format. +func ConvertStoreMatchers(storeMatchers [][]*labels.Matcher) []StoreMatcherSet { + if len(storeMatchers) == 0 { + return nil + } + + result := make([]StoreMatcherSet, len(storeMatchers)) + for i, matcherSet := range storeMatchers { + matchers := make([]LabelMatcher, len(matcherSet)) + for j, matcher := range matcherSet { + matchers[j] = LabelMatcher{ + Name: matcher.Name, + Value: matcher.Value, + Type: matcher.Type.String(), + } + } + result[i] = StoreMatcherSet{ + Matchers: matchers, + } + } + return result +} + +// GetResponseStats calculates stats from query response (works for both range and instant queries). +func GetResponseStats(resp queryrange.Response) ResponseStats { + stats := ResponseStats{} + + if resp == nil { + return stats + } + + // Use SeriesStatsCounter for both range and instant queries using OR condition + var seriesStatsCounter *queryrange.SeriesStatsCounter + if r, ok := resp.(*queryrange.PrometheusResponse); ok && r.Data.SeriesStatsCounter != nil { + seriesStatsCounter = r.Data.SeriesStatsCounter + } else if r, ok := resp.(*queryrange.PrometheusInstantQueryResponse); ok && r.Data.SeriesStatsCounter != nil { + seriesStatsCounter = r.Data.SeriesStatsCounter + } + + if seriesStatsCounter != nil { + stats.BytesFetched = seriesStatsCounter.Bytes + stats.TimeseriesFetched = seriesStatsCounter.Series + stats.Chunks = seriesStatsCounter.Chunks + stats.Samples = seriesStatsCounter.Samples + } + + return stats +} + +// WriteJSONLogToFile writes query logs to file in JSON format. +func WriteJSONLogToFile(logger log.Logger, writer interface{}, queryLog interface{}, queryType string) error { + if writer == nil { + return nil + } + + // Marshal to JSON. + jsonData, err := json.Marshal(queryLog) + if err != nil { + level.Error(logger).Log("msg", "failed to marshal "+queryType+" query log to JSON", "err", err) + return err + } + + // Write to file with newline. + jsonData = append(jsonData, '\n') + if w, ok := writer.(io.Writer); ok { + if _, err := w.Write(jsonData); err != nil { + level.Error(logger).Log("msg", "failed to write "+queryType+" query log to file", "err", err) + return err + } + } + + return nil +} diff --git a/pkg/queryfrontend/queryinstant_logger.go b/pkg/queryfrontend/queryinstant_logger.go new file mode 100644 index 00000000000..a317e9ae886 --- /dev/null +++ b/pkg/queryfrontend/queryinstant_logger.go @@ -0,0 +1,206 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "context" + "io" + "os" + "path/filepath" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/thanos-io/thanos/internal/cortex/querier/queryrange" + "gopkg.in/natefinch/lumberjack.v2" +) + +// MetricsInstantQueryLogging represents the logging information for an instant query. +type MetricsInstantQueryLogging struct { + TimestampMs int64 `json:"timestamp_ms"` + Source string `json:"source"` + QueryExpr string `json:"query_expr"` + Success bool `json:"success"` + BytesFetched int64 `json:"bytes_fetched"` + TimeseriesFetched int64 `json:"timeseries_fetched"` + Chunks int64 `json:"chunks"` + Samples int64 `json:"samples"` + EvalLatencyMs int64 `json:"eval_latency_ms"` + // User identification fields + GrafanaDashboardUid string `json:"grafana_dashboard_uid"` + GrafanaPanelId string `json:"grafana_panel_id"` + RequestId string `json:"request_id"` + Tenant string `json:"tenant"` + ForwardedFor string `json:"forwarded_for"` + UserAgent string `json:"user_agent"` + EmailId string `json:"email_id"` + // Query-related fields (instant query specific) + QueryTimestampMs int64 `json:"query_timestamp_ms"` // Query timestamp for instant queries + Path string `json:"path"` + Dedup bool `json:"dedup"` // Whether deduplication is enabled + PartialResponse bool `json:"partial_response"` // Whether partial responses are allowed + AutoDownsampling bool `json:"auto_downsampling"` // Whether automatic downsampling is enabled + MaxSourceResolutionMs int64 `json:"max_source_resolution_ms"` // Maximum source resolution in milliseconds + ReplicaLabels []string `json:"replica_labels"` + StoreMatchersCount int `json:"store_matchers_count"` // Number of store matcher sets + LookbackDeltaMs int64 `json:"lookback_delta_ms"` // Lookback delta in milliseconds + Analyze bool `json:"analyze"` // Whether query analysis is enabled + Engine string `json:"engine"` // Query engine being used + Stats string `json:"stats"` // Query statistics information + // Store-matcher details + StoreMatchers []StoreMatcherSet `json:"store_matchers"` +} + +// InstantQueryLogConfig holds configuration for instant query logging. +type InstantQueryLogConfig = QueryLogConfig + +// DefaultInstantQueryLogConfig returns the default configuration for instant query logging. +func DefaultInstantQueryLogConfig() InstantQueryLogConfig { + return InstantQueryLogConfig{ + LogDir: "/databricks/logs/pantheon-instant-query-frontend", + MaxSizeMB: 2048, // 2GB per file + MaxAge: 7, // Keep logs for 7 days + MaxBackups: 5, // Keep 5 backup files + Compress: true, + } +} + +type instantQueryLoggingMiddleware struct { + next queryrange.Handler + logger log.Logger + writer io.WriteCloser +} + +// NewInstantQueryLoggingMiddleware creates a new middleware that logs instant query information. +func NewInstantQueryLoggingMiddleware(logger log.Logger, reg prometheus.Registerer) queryrange.Middleware { + return NewInstantQueryLoggingMiddlewareWithConfig(logger, reg, DefaultInstantQueryLogConfig()) +} + +// NewInstantQueryLoggingMiddlewareWithConfig creates a new middleware with custom configuration. +func NewInstantQueryLoggingMiddlewareWithConfig(logger log.Logger, reg prometheus.Registerer, config InstantQueryLogConfig) queryrange.Middleware { + // Create the log directory if it doesn't exist. + if err := os.MkdirAll(config.LogDir, 0755); err != nil { + level.Error(logger).Log("msg", "failed to create log directory", "dir", config.LogDir, "err", err) + } + + // Create the rotating file logger. + var writer io.WriteCloser + logFilePath := filepath.Join(config.LogDir, "PantheonInstantQueryLog.json") + + rotatingLogger := &lumberjack.Logger{ + Filename: logFilePath, + MaxSize: config.MaxSizeMB, + MaxAge: config.MaxAge, + MaxBackups: config.MaxBackups, + Compress: config.Compress, + } + + writer = rotatingLogger + + return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { + return &instantQueryLoggingMiddleware{ + next: next, + logger: logger, + writer: writer, + } + }) +} + +func (m *instantQueryLoggingMiddleware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { + // Only log for instant queries. + instantReq, ok := r.(*ThanosQueryInstantRequest) + if !ok { + return m.next.Do(ctx, r) + } + + startTime := time.Now() + + // Execute the query. + resp, err := m.next.Do(ctx, r) + + // Calculate latency. + latencyMs := time.Since(startTime).Milliseconds() + + // Log the instant query. + m.logInstantQuery(instantReq, resp, err, latencyMs) + + return resp, err +} + +func (m *instantQueryLoggingMiddleware) logInstantQuery(req *ThanosQueryInstantRequest, resp queryrange.Response, err error, latencyMs int64) { + success := err == nil + userInfo := ExtractUserInfoFromHeaders(req.Headers) + + // Extract email from response headers + email := ExtractEmailFromResponse(resp) + + // This is to avoid logging queries that come from rule manager. + if userInfo.UserAgent == "Databricks-RuleManager/1.0" { + return + } + + // Calculate stats (only for successful queries). + var stats ResponseStats + if success && resp != nil { + stats = GetResponseStats(resp) + } + + // Create the instant query log entry. + instantQueryLog := MetricsInstantQueryLogging{ + TimestampMs: time.Now().UnixMilli(), + Source: userInfo.Source, + QueryExpr: req.Query, + Success: success, + BytesFetched: stats.BytesFetched, + TimeseriesFetched: stats.TimeseriesFetched, + Chunks: stats.Chunks, + Samples: stats.Samples, + EvalLatencyMs: latencyMs, + // User identification fields + GrafanaDashboardUid: userInfo.GrafanaDashboardUid, + GrafanaPanelId: userInfo.GrafanaPanelId, + RequestId: userInfo.RequestId, + Tenant: userInfo.Tenant, + ForwardedFor: userInfo.ForwardedFor, + UserAgent: userInfo.UserAgent, + EmailId: email, + // Query-related fields (instant query specific) + QueryTimestampMs: req.Time, + Path: req.Path, + Dedup: req.Dedup, + PartialResponse: req.PartialResponse, + AutoDownsampling: req.AutoDownsampling, + MaxSourceResolutionMs: req.MaxSourceResolution, + ReplicaLabels: req.ReplicaLabels, + StoreMatchersCount: len(req.StoreMatchers), + LookbackDeltaMs: req.LookbackDelta, + Analyze: req.Analyze, + Engine: req.Engine, + Stats: req.Stats, + // Store-matcher details + StoreMatchers: ConvertStoreMatchers(req.StoreMatchers), + } + + // Log to file if available. + if m.writer != nil { + m.writeToLogFile(instantQueryLog) + } +} + +func (m *instantQueryLoggingMiddleware) writeToLogFile(instantQueryLog MetricsInstantQueryLogging) { + err := WriteJSONLogToFile(m.logger, m.writer, instantQueryLog, "instant") + if err != nil { + level.Error(m.logger).Log("msg", "failed to write instant query log to file", "err", err) + } +} + +// Close should be called when the middleware is no longer needed. +func (m *instantQueryLoggingMiddleware) Close() error { + if m.writer != nil { + return m.writer.Close() + } + return nil +} diff --git a/pkg/queryfrontend/queryrange_logger.go b/pkg/queryfrontend/queryrange_logger.go new file mode 100644 index 00000000000..c6a555f45d3 --- /dev/null +++ b/pkg/queryfrontend/queryrange_logger.go @@ -0,0 +1,208 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "context" + "io" + "os" + "path/filepath" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/thanos-io/thanos/internal/cortex/querier/queryrange" + "gopkg.in/natefinch/lumberjack.v2" +) + +// MetricsRangeQueryLogging represents the logging information for a range query. +type MetricsRangeQueryLogging struct { + TimestampMs int64 `json:"timestampMs"` + Source string `json:"source"` + QueryExpr string `json:"queryExpr"` + Success bool `json:"success"` + BytesFetched int64 `json:"bytesFetched"` + TimeseriesFetched int64 `json:"timeseriesFetched"` + Chunks int64 `json:"chunks"` + Samples int64 `json:"samples"` + EvalLatencyMs int64 `json:"evalLatencyMs"` + // User identification fields + GrafanaDashboardUid string `json:"grafanaDashboardUid"` + GrafanaPanelId string `json:"grafanaPanelId"` + RequestId string `json:"requestId"` + Tenant string `json:"tenant"` + ForwardedFor string `json:"forwardedFor"` + UserAgent string `json:"userAgent"` + EmailId string `json:"emailId"` + // Query-related fields + StartTimestampMs int64 `json:"startTimestampMs"` + EndTimestampMs int64 `json:"endTimestampMs"` + StepMs int64 `json:"stepMs"` + Path string `json:"path"` + Dedup bool `json:"dedup"` // Whether deduplication is enabled + PartialResponse bool `json:"partialResponse"` // Whether partial responses are allowed + AutoDownsampling bool `json:"autoDownsampling"` // Whether automatic downsampling is enabled + MaxSourceResolutionMs int64 `json:"maxSourceResolutionMs"` // Maximum source resolution in milliseconds + ReplicaLabels []string `json:"replicaLabels"` // Labels used for replica deduplication + StoreMatchersCount int `json:"storeMatchersCount"` // Number of store matcher sets + LookbackDeltaMs int64 `json:"lookbackDeltaMs"` // Lookback delta in milliseconds + Analyze bool `json:"analyze"` // Whether query analysis is enabled + Engine string `json:"engine"` // Query engine being used + SplitIntervalMs int64 `json:"splitIntervalMs"` // Query splitting interval in milliseconds + Stats string `json:"stats"` // Query statistics information + // Store-matcher details + StoreMatchers []StoreMatcherSet `json:"storeMatchers"` +} + +// RangeQueryLogConfig holds configuration for range query logging. +type RangeQueryLogConfig = QueryLogConfig + +// DefaultRangeQueryLogConfig returns the default configuration for range query logging. +func DefaultRangeQueryLogConfig() RangeQueryLogConfig { + return RangeQueryLogConfig{ + LogDir: "/databricks/logs/pantheon-range-query-frontend", + MaxSizeMB: 2048, // 2GB per file + MaxAge: 7, // Keep logs for 7 days + MaxBackups: 5, // Keep 5 backup files + Compress: true, + } +} + +type rangeQueryLoggingMiddleware struct { + next queryrange.Handler + logger log.Logger + writer io.WriteCloser +} + +// NewRangeQueryLoggingMiddleware creates a new middleware that logs range query information. +func NewRangeQueryLoggingMiddleware(logger log.Logger, reg prometheus.Registerer) queryrange.Middleware { + return NewRangeQueryLoggingMiddlewareWithConfig(logger, reg, DefaultRangeQueryLogConfig()) +} + +// NewRangeQueryLoggingMiddlewareWithConfig creates a new middleware with custom configuration. +func NewRangeQueryLoggingMiddlewareWithConfig(logger log.Logger, reg prometheus.Registerer, config RangeQueryLogConfig) queryrange.Middleware { + // Create the log directory if it doesn't exist. + if err := os.MkdirAll(config.LogDir, 0755); err != nil { + level.Error(logger).Log("msg", "failed to create log directory", "dir", config.LogDir, "err", err) + } + + // Create the rotating file logger. + var writer io.WriteCloser + logFilePath := filepath.Join(config.LogDir, "PantheonRangeQueryLog.json") + + rotatingLogger := &lumberjack.Logger{ + Filename: logFilePath, + MaxSize: config.MaxSizeMB, + MaxAge: config.MaxAge, + MaxBackups: config.MaxBackups, + Compress: config.Compress, + } + + writer = rotatingLogger + + return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { + return &rangeQueryLoggingMiddleware{ + next: next, + logger: logger, + writer: writer, + } + }) +} + +func (m *rangeQueryLoggingMiddleware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { + // Only log for range queries. + rangeReq, ok := r.(*ThanosQueryRangeRequest) + if !ok { + return m.next.Do(ctx, r) + } + + startTime := time.Now() + + // Execute the query. + resp, err := m.next.Do(ctx, r) + + // Calculate latency. + latencyMs := time.Since(startTime).Milliseconds() + + // Log the range query. + m.logRangeQuery(rangeReq, resp, err, latencyMs) + + return resp, err +} + +func (m *rangeQueryLoggingMiddleware) logRangeQuery(req *ThanosQueryRangeRequest, resp queryrange.Response, err error, latencyMs int64) { + success := err == nil + userInfo := ExtractUserInfoFromHeaders(req.Headers) + + // Extract email from response headers + email := ExtractEmailFromResponse(resp) + + // Calculate stats (only for successful queries). + var stats ResponseStats + if success && resp != nil { + stats = GetResponseStats(resp) + } + + // Create the range query log entry. + rangeQueryLog := MetricsRangeQueryLogging{ + TimestampMs: time.Now().UnixMilli(), + Source: userInfo.Source, + QueryExpr: req.Query, + Success: success, + BytesFetched: stats.BytesFetched, + TimeseriesFetched: stats.TimeseriesFetched, + Chunks: stats.Chunks, + Samples: stats.Samples, + EvalLatencyMs: latencyMs, + // User identification fields + GrafanaDashboardUid: userInfo.GrafanaDashboardUid, + GrafanaPanelId: userInfo.GrafanaPanelId, + RequestId: userInfo.RequestId, + Tenant: userInfo.Tenant, + ForwardedFor: userInfo.ForwardedFor, + UserAgent: userInfo.UserAgent, + EmailId: email, + // Query-related fields + StartTimestampMs: req.Start, + EndTimestampMs: req.End, + StepMs: req.Step, + Path: req.Path, + Dedup: req.Dedup, + PartialResponse: req.PartialResponse, + AutoDownsampling: req.AutoDownsampling, + MaxSourceResolutionMs: req.MaxSourceResolution, + ReplicaLabels: req.ReplicaLabels, + StoreMatchersCount: len(req.StoreMatchers), + LookbackDeltaMs: req.LookbackDelta, + Analyze: req.Analyze, + Engine: req.Engine, + SplitIntervalMs: req.SplitInterval.Milliseconds(), + Stats: req.Stats, + // Store-matcher details + StoreMatchers: ConvertStoreMatchers(req.StoreMatchers), + } + + // Log to file if available. + if m.writer != nil { + m.writeToLogFile(rangeQueryLog) + } + +} + +func (m *rangeQueryLoggingMiddleware) writeToLogFile(rangeQueryLog MetricsRangeQueryLogging) { + err := WriteJSONLogToFile(m.logger, m.writer, rangeQueryLog, "range") + if err != nil { + level.Error(m.logger).Log("msg", "failed to write range query log to file", "err", err) + } +} + +// Close should be called when the middleware is no longer needed. +func (m *rangeQueryLoggingMiddleware) Close() error { + if m.writer != nil { + return m.writer.Close() + } + return nil +} diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index c56bbf59d76..6eaba9037b9 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -78,6 +78,7 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) queryRangeLimits, queryInstantCodec, prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_instant"}, reg), + logger, config.ForwardHeaders, config.CortexHandlerConfig.QueryStatsEnabled, ) @@ -243,6 +244,13 @@ func newQueryRangeTripperware( ) } + // Add range query logging middleware. + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("rangequerylogging", m, logger), + NewRangeQueryLoggingMiddleware(logger, reg), + ) + return func(next http.RoundTripper) http.RoundTripper { rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, queryRangeMiddleware...) return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) { @@ -341,6 +349,7 @@ func newInstantQueryTripperware( limits queryrange.Limits, codec queryrange.Codec, reg prometheus.Registerer, + logger log.Logger, forwardHeaders []string, forceStats bool, ) queryrange.Tripperware { @@ -360,6 +369,13 @@ func newInstantQueryTripperware( queryrange.NewStatsMiddleware(forceStats), ) + // Add instant query logging middleware. + instantQueryMiddlewares = append( + instantQueryMiddlewares, + queryrange.InstrumentMiddleware("instantquerylogging", m, logger), + NewInstantQueryLoggingMiddleware(logger, reg), + ) + return func(next http.RoundTripper) http.RoundTripper { rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, instantQueryMiddlewares...) return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) { From 22e42509b3f531a6b33a14fb89bc21330170d085 Mon Sep 17 00:00:00 2001 From: Tushar Poddar <141116152+tushar-poddar@users.noreply.github.com> Date: Mon, 8 Sep 2025 09:29:38 -0700 Subject: [PATCH 07/11] Cherry pick from db_main (#215) --- pkg/receive/handler.go | 20 +++++++++++++++++++- pkg/reloader/reloader_test.go | 1 + 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b0c9ec55bbd..1547c786869 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -11,6 +11,7 @@ import ( "io" stdlog "log" "math" + "math/rand" "net" "net/http" "sort" @@ -671,7 +672,24 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { nowMS := time.Now().UnixNano() / int64(time.Millisecond) for _, ts := range wreq.Timeseries { if lat := secondsSinceFirstSample(nowMS, ts); lat > 0 { - h.writeE2eLatency.WithLabelValues(strconv.Itoa(responseStatusCode), tenantHTTP, strconv.FormatBool(isPreAgged(ts))).Observe(lat) + isPreAgged := isPreAgged(ts) + h.writeE2eLatency.WithLabelValues(strconv.Itoa(responseStatusCode), tenantHTTP, strconv.FormatBool(isPreAgged)).Observe(lat) + + // Log high latency requests (>3 minutes) with sampling (1 in 10000) + if lat > 180 && !isPreAgged && rand.Intn(10000) == 0 { + + // Convert labels to string for logging + var labelPairs []string + for _, label := range ts.Labels { + labelPairs = append(labelPairs, fmt.Sprintf("%s=%s", label.Name, label.Value)) + } + + level.Warn(h.logger).Log( + "msg", "high e2e latency detected for non-rollup timeseries", + "latency_seconds", lat, + "labels", fmt.Sprintf("{%s}", strings.Join(labelPairs, ", ")), + ) + } } } } diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 74629d7122c..7c0861deb63 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -315,6 +315,7 @@ faulty_config: } func TestReloader_ConfigDirApply(t *testing.T) { + t.Skip("flaky test") t.Parallel() l, err := net.Listen("tcp", "localhost:0") From 35c8e02d76b3c4a53d2814d0fc23906d4d981617 Mon Sep 17 00:00:00 2001 From: "HC Zhu (Databricks)" Date: Fri, 12 Sep 2025 18:26:47 -0700 Subject: [PATCH 08/11] Cherry pick commits from db_main to release (#225) Co-authored-by: Pranav Mishra <170371223+pranavmishradatabricks@users.noreply.github.com> --- pkg/query/endpointset.go | 2 +- pkg/reloader/reloader_test.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 3fafd53a8af..9c9f035760a 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -754,7 +754,7 @@ func (er *endpointRef) isQueryable() bool { er.mtx.RLock() defer er.mtx.RUnlock() - return er.isStrict || er.ignoreError || er.status.LastError == nil + return er.isStrict || er.status.LastError == nil } func (er *endpointRef) ComponentType() component.Component { diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 7c0861deb63..694f8be46c2 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -619,6 +619,7 @@ func TestReloader_ConfigDirApply(t *testing.T) { } func TestReloader_ConfigDirApplyBasedOnWatchInterval(t *testing.T) { + t.Skip("flaky test") t.Parallel() l, err := net.Listen("tcp", "localhost:0") @@ -830,6 +831,7 @@ func TestReloader_ConfigDirApplyBasedOnWatchInterval(t *testing.T) { } func TestReloader_DirectoriesApply(t *testing.T) { + t.Skip("flaky test") t.Parallel() l, err := net.Listen("tcp", "localhost:0") From 765b2c9864bf4ca7a35ee6417c59b480b1a79043 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Mon, 20 Oct 2025 16:24:03 -0700 Subject: [PATCH 09/11] support wildcard tenant deletion to catch backlogs Signed-off-by: Yi Jin --- pkg/compact/retention.go | 24 +++- pkg/compact/retention_test.go | 207 ++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+), 5 deletions(-) diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 668cd5b9138..c097214811a 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -27,7 +27,10 @@ const ( // tenantRetentionRegex is the regex pattern for parsing tenant retention. // valid format is `:(|d)(:all)?` where > 0. // Default behavior is to delete only level 1 blocks, use :all to delete all blocks. - tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))(:all)?$` + // Use `*` as tenant name to apply policy to all tenants (as a default/fallback). + // Specific tenant policies take precedence over the wildcard policy. + tenantRetentionRegex = `^([\w-]+|\*):((\d{4}-\d{2}-\d{2})|(\d+d))(:all)?$` + wildCardTenant = "*" Level1 = 1 // compaction level 1 indicating a new block Level2 = 2 // compaction level 2 indicating a compacted block @@ -120,6 +123,8 @@ func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) } // ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime. +// The wildcard policy ("*") applies to all tenants as a default/fallback. +// Specific tenant policies take precedence over the wildcard policy. func ApplyRetentionPolicyByTenant( ctx context.Context, logger log.Logger, @@ -133,11 +138,20 @@ func ApplyRetentionPolicyByTenant( } level.Info(logger).Log("msg", "start tenant retention", "total", len(metas)) deleted, skipped, notExpired := 0, 0, 0 + // Check if wildcard policy exists + wildcardPolicy, hasWildcard := retentionByTenant[wildCardTenant] for id, m := range metas { - policy, ok := retentionByTenant[m.Thanos.GetTenant()] + tenant := m.Thanos.GetTenant() + // First try to find tenant-specific policy + policy, ok := retentionByTenant[tenant] if !ok { - skipped++ - continue + // Fallback to wildcard policy if tenant-specific policy not found + if hasWildcard { + policy = wildcardPolicy + } else { + skipped++ + continue + } } maxTime := time.Unix(m.MaxTime/1000, 0) // Default behavior: only delete level 1 blocks unless IsAll is true @@ -145,7 +159,7 @@ func ApplyRetentionPolicyByTenant( continue } if policy.isExpired(maxTime) { - level.Info(logger).Log("msg", "deleting blocks applying retention policy", "id", id, "maxTime", maxTime.String()) + level.Info(logger).Log("msg", "deleting blocks applying retention policy", "id", id, "tenant", tenant, "maxTime", maxTime.String()) if err := block.Delete(ctx, logger, bkt, id); err != nil { level.Error(logger).Log("msg", "failed to delete block", "id", id, "err", err) continue // continue to next block to clean up backlogs diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 0e43da89e72..2524414894a 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -374,6 +374,47 @@ func TestParseRetentionPolicyByTenant(t *testing.T) { nil, true, }, + { + "wildcard tenant with duration", + []string{"*:30d"}, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 30 * 24 * time.Hour, + IsAll: false, + }, + }, + false, + }, + { + "wildcard tenant with cutoff date and all flag", + []string{"*:2024-01-01:all"}, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + RetentionDuration: time.Duration(0), + IsAll: true, + }, + }, + false, + }, + { + "wildcard with specific tenant override", + []string{"*:90d", "tenant-special:30d:all"}, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 90 * 24 * time.Hour, + IsAll: false, + }, + "tenant-special": { + CutoffDate: time.Time{}, + RetentionDuration: 30 * 24 * time.Hour, + IsAll: true, + }, + }, + false, + }, } { t.Run(tt.name, func(t *testing.T) { got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants) @@ -573,6 +614,172 @@ func TestApplyRetentionPolicyByTenant(t *testing.T) { }, false, }, + { + "wildcard tenant applies to all tenants", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-a", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-b", + time.Now().Add(-2 * 24 * time.Hour), + time.Now().Add(-24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-c", + time.Now().Add(-24 * time.Hour), + time.Now().Add(-23 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW51", + "tenant-d", + time.Now().Add(-5 * time.Hour), + time.Now().Add(-4 * time.Hour), + compact.Level1, + }, + }, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * time.Hour, + IsAll: false, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW51/", + }, + false, + }, + { + "wildcard tenant with all flag applies to all levels", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-a", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-b", + time.Now().Add(-2 * 24 * time.Hour), + time.Now().Add(-24 * time.Hour), + compact.Level2, + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-c", + time.Now().Add(-5 * time.Hour), + time.Now().Add(-4 * time.Hour), + compact.Level1, + }, + }, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * time.Hour, + IsAll: true, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW50/", + }, + false, + }, + { + "wildcard with specific tenant override - wildcard longer retention, specific shorter", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-a", + time.Now().Add(-50 * 24 * time.Hour), + time.Now().Add(-49 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-cleanup", + time.Now().Add(-15 * 24 * time.Hour), + time.Now().Add(-14 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-b", + time.Now().Add(-20 * 24 * time.Hour), + time.Now().Add(-19 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW51", + "tenant-cleanup", + time.Now().Add(-5 * time.Hour), + time.Now().Add(-4 * time.Hour), + compact.Level1, + }, + }, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 30 * 24 * time.Hour, // 30 days for most tenants + IsAll: false, + }, + "tenant-cleanup": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * 24 * time.Hour, // 10 days for cleanup tenant + IsAll: false, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW50/", + "01CPHBEX20729MJQZXE3W0BW51/", + }, + false, + }, + { + "wildcard precedence - specific policy takes priority over wildcard", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-override", + time.Now().Add(-15 * 24 * time.Hour), + time.Now().Add(-14 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-normal", + time.Now().Add(-15 * 24 * time.Hour), + time.Now().Add(-14 * 24 * time.Hour), + compact.Level1, + }, + }, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * 24 * time.Hour, // 10 days wildcard + IsAll: false, + }, + "tenant-override": { + CutoffDate: time.Time{}, + RetentionDuration: 20 * 24 * time.Hour, // 20 days specific override + IsAll: false, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW48/", // kept due to 20-day specific policy + }, + false, + }, } { t.Run(tt.name, func(t *testing.T) { bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) From a6d21aba9734b6c2c0cf99472433aa1e1cafd297 Mon Sep 17 00:00:00 2001 From: divyansh-chhabria-db Date: Wed, 15 Oct 2025 14:51:50 +0530 Subject: [PATCH 10/11] Enhancing Pantheon Query logs [IMON-110] (#234) --- pkg/queryfrontend/query_logger.go | 82 +++++++++++++++++------- pkg/queryfrontend/queryinstant_logger.go | 18 +++--- pkg/queryfrontend/queryrange_logger.go | 13 ++-- 3 files changed, 77 insertions(+), 36 deletions(-) diff --git a/pkg/queryfrontend/query_logger.go b/pkg/queryfrontend/query_logger.go index c97c9c3b990..3629d23fd53 100644 --- a/pkg/queryfrontend/query_logger.go +++ b/pkg/queryfrontend/query_logger.go @@ -36,6 +36,8 @@ type UserInfo struct { Tenant string ForwardedFor string UserAgent string + Groups string + Email string } // ResponseStats holds statistics extracted from query response. @@ -91,6 +93,10 @@ func ExtractUserInfoFromHeaders(headers []*RequestHeader) UserInfo { if userInfo.Source == "" { userInfo.Source = headerValue } + case "x-auth-request-groups": + userInfo.Groups = headerValue + case "x-auth-request-email": + userInfo.Email = headerValue } } @@ -102,29 +108,6 @@ func ExtractUserInfoFromHeaders(headers []*RequestHeader) UserInfo { return userInfo } -// ExtractEmailFromResponse extracts the email from response headers (works for both range and instant queries). -func ExtractEmailFromResponse(resp queryrange.Response) string { - if resp == nil { - return "" - } - - // Check both response types using OR condition - var headers []*queryrange.PrometheusResponseHeader - if promResp, ok := resp.(*queryrange.PrometheusResponse); ok { - headers = promResp.GetHeaders() - } else if promResp, ok := resp.(*queryrange.PrometheusInstantQueryResponse); ok { - headers = promResp.GetHeaders() - } - - for _, header := range headers { - if strings.ToLower(header.Name) == "x-auth-request-email" && len(header.Values) > 0 { - return header.Values[0] - } - } - - return "" -} - // ConvertStoreMatchers converts internal store matchers to logging format. func ConvertStoreMatchers(storeMatchers [][]*labels.Matcher) []StoreMatcherSet { if len(storeMatchers) == 0 { @@ -174,6 +157,59 @@ func GetResponseStats(resp queryrange.Response) ResponseStats { return stats } +// ExtractMetricNames extracts all unique __name__ labels from query response (works for both range and instant queries). +func ExtractMetricNames(resp queryrange.Response) []string { + if resp == nil { + return nil + } + + metricNamesMap := make(map[string]struct{}) + + // Handle range query response (resultType: matrix) + if r, ok := resp.(*queryrange.PrometheusResponse); ok { + for _, stream := range r.Data.Result { + for _, label := range stream.Labels { + if label.Name == "__name__" { + metricNamesMap[label.Value] = struct{}{} + break + } + } + } + } else if r, ok := resp.(*queryrange.PrometheusInstantQueryResponse); ok { + // Handle instant query response - check all result types + if vector := r.Data.Result.GetVector(); vector != nil { + // resultType: vector + for _, sample := range vector.Samples { + for _, label := range sample.Labels { + if label.Name == "__name__" { + metricNamesMap[label.Value] = struct{}{} + break + } + } + } + } else if matrix := r.Data.Result.GetMatrix(); matrix != nil { + // resultType: matrix (subqueries in instant queries) + for _, stream := range matrix.SampleStreams { + for _, label := range stream.Labels { + if label.Name == "__name__" { + metricNamesMap[label.Value] = struct{}{} + break + } + } + } + } + // Scalar and StringSample don't have __name__ labels + } + + // Convert map to slice + metricNames := make([]string, 0, len(metricNamesMap)) + for name := range metricNamesMap { + metricNames = append(metricNames, name) + } + + return metricNames +} + // WriteJSONLogToFile writes query logs to file in JSON format. func WriteJSONLogToFile(logger log.Logger, writer interface{}, queryLog interface{}, queryType string) error { if writer == nil { diff --git a/pkg/queryfrontend/queryinstant_logger.go b/pkg/queryfrontend/queryinstant_logger.go index a317e9ae886..50a875816e0 100644 --- a/pkg/queryfrontend/queryinstant_logger.go +++ b/pkg/queryfrontend/queryinstant_logger.go @@ -37,6 +37,7 @@ type MetricsInstantQueryLogging struct { ForwardedFor string `json:"forwarded_for"` UserAgent string `json:"user_agent"` EmailId string `json:"email_id"` + Groups string `json:"groups"` // Query-related fields (instant query specific) QueryTimestampMs int64 `json:"query_timestamp_ms"` // Query timestamp for instant queries Path string `json:"path"` @@ -50,6 +51,8 @@ type MetricsInstantQueryLogging struct { Analyze bool `json:"analyze"` // Whether query analysis is enabled Engine string `json:"engine"` // Query engine being used Stats string `json:"stats"` // Query statistics information + MetricNames []string `json:"metric_names"` // Unique metric names (__name__ labels) in response + Shard string `json:"shard"` // Pantheon shard name // Store-matcher details StoreMatchers []StoreMatcherSet `json:"store_matchers"` } @@ -134,18 +137,12 @@ func (m *instantQueryLoggingMiddleware) logInstantQuery(req *ThanosQueryInstantR success := err == nil userInfo := ExtractUserInfoFromHeaders(req.Headers) - // Extract email from response headers - email := ExtractEmailFromResponse(resp) - - // This is to avoid logging queries that come from rule manager. - if userInfo.UserAgent == "Databricks-RuleManager/1.0" { - return - } - // Calculate stats (only for successful queries). var stats ResponseStats + var metricNames []string if success && resp != nil { stats = GetResponseStats(resp) + metricNames = ExtractMetricNames(resp) } // Create the instant query log entry. @@ -166,7 +163,8 @@ func (m *instantQueryLoggingMiddleware) logInstantQuery(req *ThanosQueryInstantR Tenant: userInfo.Tenant, ForwardedFor: userInfo.ForwardedFor, UserAgent: userInfo.UserAgent, - EmailId: email, + EmailId: userInfo.Email, + Groups: userInfo.Groups, // Query-related fields (instant query specific) QueryTimestampMs: req.Time, Path: req.Path, @@ -180,6 +178,8 @@ func (m *instantQueryLoggingMiddleware) logInstantQuery(req *ThanosQueryInstantR Analyze: req.Analyze, Engine: req.Engine, Stats: req.Stats, + MetricNames: metricNames, + Shard: os.Getenv("PANTHEON_SHARDNAME"), // Store-matcher details StoreMatchers: ConvertStoreMatchers(req.StoreMatchers), } diff --git a/pkg/queryfrontend/queryrange_logger.go b/pkg/queryfrontend/queryrange_logger.go index c6a555f45d3..665a857e127 100644 --- a/pkg/queryfrontend/queryrange_logger.go +++ b/pkg/queryfrontend/queryrange_logger.go @@ -37,6 +37,7 @@ type MetricsRangeQueryLogging struct { ForwardedFor string `json:"forwardedFor"` UserAgent string `json:"userAgent"` EmailId string `json:"emailId"` + Groups string `json:"groups"` // Query-related fields StartTimestampMs int64 `json:"startTimestampMs"` EndTimestampMs int64 `json:"endTimestampMs"` @@ -53,6 +54,8 @@ type MetricsRangeQueryLogging struct { Engine string `json:"engine"` // Query engine being used SplitIntervalMs int64 `json:"splitIntervalMs"` // Query splitting interval in milliseconds Stats string `json:"stats"` // Query statistics information + MetricNames []string `json:"metricNames"` // Unique metric names (__name__ labels) in response + Shard string `json:"shard"` // Pantheon shard name // Store-matcher details StoreMatchers []StoreMatcherSet `json:"storeMatchers"` } @@ -137,13 +140,12 @@ func (m *rangeQueryLoggingMiddleware) logRangeQuery(req *ThanosQueryRangeRequest success := err == nil userInfo := ExtractUserInfoFromHeaders(req.Headers) - // Extract email from response headers - email := ExtractEmailFromResponse(resp) - // Calculate stats (only for successful queries). var stats ResponseStats + var metricNames []string if success && resp != nil { stats = GetResponseStats(resp) + metricNames = ExtractMetricNames(resp) } // Create the range query log entry. @@ -164,7 +166,8 @@ func (m *rangeQueryLoggingMiddleware) logRangeQuery(req *ThanosQueryRangeRequest Tenant: userInfo.Tenant, ForwardedFor: userInfo.ForwardedFor, UserAgent: userInfo.UserAgent, - EmailId: email, + EmailId: userInfo.Email, + Groups: userInfo.Groups, // Query-related fields StartTimestampMs: req.Start, EndTimestampMs: req.End, @@ -181,6 +184,8 @@ func (m *rangeQueryLoggingMiddleware) logRangeQuery(req *ThanosQueryRangeRequest Engine: req.Engine, SplitIntervalMs: req.SplitInterval.Milliseconds(), Stats: req.Stats, + MetricNames: metricNames, + Shard: os.Getenv("PANTHEON_SHARDNAME"), // Store-matcher details StoreMatchers: ConvertStoreMatchers(req.StoreMatchers), } From c04a11d0257663ecdc05482ad07004dfa4832a5b Mon Sep 17 00:00:00 2001 From: Yuchen Wang <162491048+yuchen-db@users.noreply.github.com> Date: Mon, 24 Nov 2025 20:17:51 -0800 Subject: [PATCH 11/11] Cherry-pick upstream goroutine leak fixes from OSS Thanos (#245) --- CHANGELOG.md | 2 ++ pkg/store/bucket.go | 31 +++++++++++++++++++++++++++++-- pkg/store/bucket_e2e_test.go | 34 ++++++++++++++++++++++++++++------ pkg/store/bucket_test.go | 2 +- pkg/store/lazy_postings.go | 24 ++++++++++++++---------- pkg/store/proxy_merge.go | 15 +++++++++++++-- pkg/store/proxy_test.go | 4 ---- 7 files changed, 87 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 966c5f2411f..0f25e45c25e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed +- [#8378](https://github.com/thanos-io/thanos/pull/8378): Store: fix the reuse of dirty posting slices + ### Added ### Changed diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7417b496420..d85b202705f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/util/zeropool" "github.com/weaveworks/common/httpgrpc" "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" @@ -125,8 +126,22 @@ const ( var ( errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }} + postingsPool zeropool.Pool[[]storage.SeriesRef] ) +func getPostingsSlice() []storage.SeriesRef { + if p := postingsPool.Get(); p != nil { + return p + } + + // Pre-allocate slice with initial capacity. + return make([]storage.SeriesRef, 0, 1024) +} + +func putPostingsSlice(p []storage.SeriesRef) { + postingsPool.Put(p[:0]) +} + type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge blockLoads prometheus.Counter @@ -1693,6 +1708,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store err = g.Wait() }) if err != nil { + for _, resp := range respSets { + resp.Close() + } code := codes.Aborted if s, ok := status.FromError(errors.Cause(err)); ok { code = s.Code() @@ -2539,6 +2557,10 @@ type bucketIndexReader struct { indexVersion int logger log.Logger + + // Posting slice to return to the postings pool on close. + // A single bucketIndexReader should have at most 1 postings slice to return. + postings []storage.SeriesRef } func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader { @@ -2659,13 +2681,13 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch // ExpandPostingsWithContext returns the postings expanded as a slice and considers context. func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage.SeriesRef, error) { - res := make([]storage.SeriesRef, 0, 1024) // Pre-allocate slice with initial capacity + res := getPostingsSlice() i := 0 for p.Next() { i++ if i%checkContextEveryNIterations == 0 { if err := ctx.Err(); err != nil { - return nil, err + return res, err } } res = append(res, p.At()) @@ -2958,6 +2980,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, } ps, err := ExpandPostingsWithContext(ctx, p) + r.postings = ps if err != nil { level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil @@ -3394,6 +3417,10 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() + + if r.postings != nil { + putPostingsSlice(r.postings) + } return nil } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index bee87898c6c..12a970437d9 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -131,7 +131,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig, opts ...BucketStoreOption) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -176,10 +176,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m true, true, time.Minute, - WithLogger(s.logger), - WithIndexCache(s.cache), - WithFilterConfig(filterConf), - WithRegistry(reg), + append(opts, WithLogger(s.logger), + WithIndexCache(s.cache), + WithFilterConfig(filterConf), + WithRegistry(reg))..., ) testutil.Ok(t, err) defer func() { testutil.Ok(t, store.Close()) }() @@ -619,6 +619,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { maxChunksLimit uint64 maxSeriesLimit uint64 maxBytesLimit int64 + storeOpts []BucketStoreOption expectedErr string code codes.Code }{ @@ -630,12 +631,25 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { expectedErr: "exceeded chunks limit", code: codes.ResourceExhausted, }, + "should fail if the max chunks limit is exceeded - ResourceExhausted (sortingStrategyNone)": { + maxChunksLimit: expectedChunks - 1, + expectedErr: "exceeded chunks limit", + storeOpts: []BucketStoreOption{WithDontResort(true)}, + code: codes.ResourceExhausted, + }, "should fail if the max series limit is exceeded - ResourceExhausted": { maxChunksLimit: expectedChunks, expectedErr: "exceeded series limit", maxSeriesLimit: 1, code: codes.ResourceExhausted, }, + "should fail if the max series limit is exceeded - ResourceExhausted (sortingStrategyNone)": { + maxChunksLimit: expectedChunks, + expectedErr: "exceeded series limit", + maxSeriesLimit: 1, + storeOpts: []BucketStoreOption{WithDontResort(true)}, + code: codes.ResourceExhausted, + }, "should fail if the max bytes limit is exceeded - ResourceExhausted": { maxChunksLimit: expectedChunks, expectedErr: "exceeded bytes limit", @@ -643,6 +657,14 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { maxBytesLimit: 1, code: codes.ResourceExhausted, }, + "should fail if the max bytes limit is exceeded - ResourceExhausted (sortingStrategyNone)": { + maxChunksLimit: expectedChunks, + expectedErr: "exceeded bytes limit", + maxSeriesLimit: 2, + maxBytesLimit: 1, + storeOpts: []BucketStoreOption{WithDontResort(true)}, + code: codes.ResourceExhausted, + }, } for testName, testData := range cases { @@ -653,7 +675,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf, testData.storeOpts...) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 8027c093de7..dfa9932b93c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2971,7 +2971,7 @@ func TestExpandPostingsWithContextCancel(t *testing.T) { res, err := ExpandPostingsWithContext(ctx, p) testutil.NotOk(t, err) testutil.Equals(t, context.Canceled, err) - testutil.Equals(t, []storage.SeriesRef(nil), res) + testutil.Equals(t, true, cap(res) == 1024) } func samePostingGroup(a, b *postingGroup) bool { diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index f8363ab477e..ba8e8fec65e 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -279,6 +279,19 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return nil, nil, errors.Wrap(err, "get postings") } + result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups) + if err := ctx.Err(); err != nil { + return nil, nil, err + } + ps, err := ExpandPostingsWithContext(ctx, result) + r.postings = ps + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} + +func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings { // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys // again, and this is exactly the same order as before (when building the groups), so we can simply // use one incrementing index to fetch postings from returned slice. @@ -306,14 +319,5 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post } } - result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) - - if err := ctx.Err(); err != nil { - return nil, nil, err - } - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, nil, errors.Wrap(err, "expand") - } - return ps, lazyMatchers, nil + return index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) } diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index d128487d0c5..25dc335951a 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + grpc_opentracing "github.com/thanos-io/thanos/pkg/tracing/tracing_middleware" "github.com/thanos-io/thanos/pkg/losertree" @@ -277,6 +278,8 @@ type lazyRespSet struct { initialized bool shardMatcher *storepb.ShardMatcher + + donec chan struct{} } func (l *lazyRespSet) isEmpty() bool { @@ -385,6 +388,7 @@ func newLazyRespSet( ringHead: 0, ringTail: 0, closed: false, + donec: make(chan struct{}), } respSet.storeLabels = make(map[string]struct{}) for _, ls := range storeLabelSets { @@ -403,6 +407,8 @@ func newLazyRespSet( l.span.SetTag("processed.samples", seriesStats.Samples) l.span.SetTag("processed.bytes", bytesProcessed) l.span.Finish() + + close(l.donec) }() numResponses := 0 @@ -611,13 +617,14 @@ func newAsyncRespSet( func (l *lazyRespSet) Close() { l.bufferedResponsesMtx.Lock() - defer l.bufferedResponsesMtx.Unlock() - l.closeSeries() l.closed = true l.bufferSlotEvent.Signal() l.noMoreData = true l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + + <-l.donec // Wait for the internal goroutine to complete its work. l.shardMatcher.Close() _ = l.cl.CloseSend() @@ -806,11 +813,15 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] } func (l *eagerRespSet) Close() { + l.wg.Wait() + if l.closeSeries != nil { l.closeSeries() } + l.wg.Wait() l.shardMatcher.Close() _ = l.cl.CloseSend() + } func (l *eagerRespSet) At() *storepb.SeriesResponse { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 7617f48b296..9d4cae6c1e9 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -2055,10 +2055,6 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { return } } - - // Wait until the last goroutine exits which is stuck on time.Sleep(). - // Otherwise, goleak complains. - time.Sleep(5 * time.Second) } func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {