From f9f4eb6a56ec24184620451360c8c9d1b7d129b2 Mon Sep 17 00:00:00 2001 From: Kusum Madarasu Date: Tue, 27 Jan 2026 19:26:16 +0000 Subject: [PATCH 1/4] memory heap optimizations --- pkg/receive/handler.go | 54 +++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b2fdd38217..7a38301feb 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -87,6 +87,26 @@ var ( errNotReady = errors.New("target not ready") errUnavailable = errors.New("target not available") errInternal = errors.New("internal error") + + // Buffer pools to reduce memory allocations in hot path. + compressedBufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 32*1024)) // 32KB default + }, + } + + decompressedBufPool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 0, 128*1024) // 128KB default + return &buf + }, + } + + writeRequestPool = sync.Pool{ + New: func() interface{} { + return &prompb.WriteRequest{} + }, + } ) type WriteableStoreAsyncClient interface { @@ -564,7 +584,13 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { requestLimiter := h.Limiter.RequestLimiter() // io.ReadAll dynamically adjust the byte slice for read data, starting from 512B. // Since this is receive hot path, grow upfront saving allocations and CPU time. - compressed := bytes.Buffer{} + // Use buffer pool to reduce allocations. + compressed := compressedBufPool.Get().(*bytes.Buffer) + defer func() { + compressed.Reset() + compressedBufPool.Put(compressed) + }() + if r.ContentLength >= 0 { if !requestLimiter.AllowSizeBytes(tenantHTTP, r.ContentLength) { http.Error(w, "write request too large", http.StatusRequestEntityTooLarge) @@ -574,19 +600,23 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } else { compressed.Grow(512) } - _, err = io.Copy(&compressed, r.Body) + _, err = io.Copy(compressed, r.Body) if err != nil { http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError) return } - reqBuf, err := s2.Decode(nil, compressed.Bytes()) + + // Use buffer pool for decompressed data. + reqBuf := decompressedBufPool.Get().(*[]byte) + defer decompressedBufPool.Put(reqBuf) + *reqBuf, err = s2.Decode((*reqBuf)[:0], compressed.Bytes()) if err != nil { level.Error(tLogger).Log("msg", "snappy decode error", "err", err) http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest) return } - if !requestLimiter.AllowSizeBytes(tenantHTTP, int64(len(reqBuf))) { + if !requestLimiter.AllowSizeBytes(tenantHTTP, int64(len(*reqBuf))) { http.Error(w, "write request too large", http.StatusRequestEntityTooLarge) return } @@ -594,8 +624,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { // NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory // from the whole request. Ensure that we always copy those when we want to // store them for longer time. - var wreq prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &wreq); err != nil { + // Use proto message pool to reduce allocations. + wreq := writeRequestPool.Get().(*prompb.WriteRequest) + defer func() { + // Reset before returning to pool to avoid memory leaks. + wreq.Timeseries = wreq.Timeseries[:0] + wreq.Metadata = wreq.Metadata[:0] + writeRequestPool.Put(wreq) + }() + + if err := proto.Unmarshal(*reqBuf, wreq); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -637,14 +675,14 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } // Apply relabeling configs. - h.relabel(&wreq) + h.relabel(wreq) if len(wreq.Timeseries) == 0 { level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.") return } responseStatusCode := http.StatusOK - tenantStats, err := h.handleRequest(ctx, rep, tenantHTTP, &wreq) + tenantStats, err := h.handleRequest(ctx, rep, tenantHTTP, wreq) if err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error()) switch errors.Cause(err) { From 237f527bbe410c2c37bbb0ffd9aa252bc962d819 Mon Sep 17 00:00:00 2001 From: Kusum Madarasu Date: Wed, 28 Jan 2026 19:14:49 +0000 Subject: [PATCH 2/4] with unit test fixes --- pkg/receive/handler.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 7a38301feb..887f3e91c4 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -681,6 +681,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } + // Deep copy all labels to detach them from the pooled buffer. + // This is necessary because: + // 1. The pooled buffer will be returned and reused when this function returns + // 2. Async remote write goroutines may still reference the labels + // 3. Even local writes need detached labels for TSDB storage + // Use the same intern setting as the writer to ensure consistent behavior. + for i := range wreq.Timeseries { + labelpb.ReAllocZLabelsStrings(&wreq.Timeseries[i].Labels, h.writer.opts.Intern) + } + responseStatusCode := http.StatusOK tenantStats, err := h.handleRequest(ctx, rep, tenantHTTP, wreq) if err != nil { From da4deec4de169acd9c27007ab3b5d6800fc83ea7 Mon Sep 17 00:00:00 2001 From: mkusumdb Date: Fri, 30 Jan 2026 08:50:55 +0000 Subject: [PATCH 3/4] fix test --- pkg/receive/handler.go | 4 ++-- test/e2e/receive_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 887f3e91c4..6969cfe15c 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -626,10 +626,10 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { // store them for longer time. // Use proto message pool to reduce allocations. wreq := writeRequestPool.Get().(*prompb.WriteRequest) + wreq.Reset() // Reset immediately after getting from pool defer func() { // Reset before returning to pool to avoid memory leaks. - wreq.Timeseries = wreq.Timeseries[:0] - wreq.Metadata = wreq.Metadata[:0] + wreq.Reset() writeRequestPool.Put(wreq) }() diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index c938a4f040..efe04bb9b7 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -1052,8 +1052,8 @@ func TestReceiveExtractsTenant(t *testing.T) { Timeseries: []prompb.TimeSeries{ { Labels: []prompb.Label{ - {Name: tenantLabelName, Value: "tenant-1"}, {Name: "aa", Value: "bb"}, + {Name: tenantLabelName, Value: "tenant-1"}, }, Samples: []prompb.Sample{ {Value: 1, Timestamp: time.Now().UnixMilli()}, @@ -1072,8 +1072,8 @@ func TestReceiveExtractsTenant(t *testing.T) { Timeseries: []prompb.TimeSeries{ { Labels: []prompb.Label{ - {Name: tenantLabelName, Value: "tenant-2"}, {Name: "aa", Value: "bb"}, + {Name: tenantLabelName, Value: "tenant-2"}, }, Samples: []prompb.Sample{ {Value: 1, Timestamp: time.Now().UnixMilli()}, @@ -1101,8 +1101,8 @@ func TestReceiveExtractsTenant(t *testing.T) { Timeseries: []prompb.TimeSeries{ { Labels: []prompb.Label{ - {Name: tenantLabelName, Value: "tenant-3"}, {Name: "aa", Value: "bb"}, + {Name: tenantLabelName, Value: "tenant-3"}, }, Samples: []prompb.Sample{ {Value: 1, Timestamp: time.Now().UnixMilli()}, From 6081842fea67dd5569fd9881dc4df489e3a5f919 Mon Sep 17 00:00:00 2001 From: mkusumdb Date: Tue, 3 Feb 2026 08:02:37 +0000 Subject: [PATCH 4/4] Fix tenant extraction fallback and update test expectations - Fixed handler to use HTTP header tenant when split label is missing - Updated test expectations to use full tenant ID format - All e2e tests passing with memory heap optimizations --- pkg/receive/handler.go | 3 +-- test/e2e/receive_test.go | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 6969cfe15c..392ff49390 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -978,9 +978,8 @@ func (h *Handler) distributeTimeseriesToReplicas( tenantLabel := lbls.Get(h.splitTenantLabelName) if tenantLabel != "" { tenant = h.splitTenantLabelName + ":" + tenantLabel - } else { - tenant = h.options.DefaultTenantID } + // If label not found, keep using tenantHTTP (don't override) } for _, rn := range replicas { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index efe04bb9b7..f5a5609b38 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -1063,7 +1063,7 @@ func TestReceiveExtractsTenant(t *testing.T) { }) })) - testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "tenant-1")), e2emon.WaitMissingMetrics())) + testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", tenantLabelName+":tenant-1")), e2emon.WaitMissingMetrics())) }) t.Run("tenant label is extracted from one series, default is used for the other one", func(t *testing.T) { @@ -1149,7 +1149,7 @@ func TestReceiveExtractsTenant(t *testing.T) { })) testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "http-tenant")), e2emon.WaitMissingMetrics())) - testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "tenant-3")), e2emon.WaitMissingMetrics())) + testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", tenantLabelName+":tenant-3")), e2emon.WaitMissingMetrics())) }) }