diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b2fdd382178..392ff49390d 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -87,6 +87,26 @@ var ( errNotReady = errors.New("target not ready") errUnavailable = errors.New("target not available") errInternal = errors.New("internal error") + + // Buffer pools to reduce memory allocations in hot path. + compressedBufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 32*1024)) // 32KB default + }, + } + + decompressedBufPool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 0, 128*1024) // 128KB default + return &buf + }, + } + + writeRequestPool = sync.Pool{ + New: func() interface{} { + return &prompb.WriteRequest{} + }, + } ) type WriteableStoreAsyncClient interface { @@ -564,7 +584,13 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { requestLimiter := h.Limiter.RequestLimiter() // io.ReadAll dynamically adjust the byte slice for read data, starting from 512B. // Since this is receive hot path, grow upfront saving allocations and CPU time. - compressed := bytes.Buffer{} + // Use buffer pool to reduce allocations. + compressed := compressedBufPool.Get().(*bytes.Buffer) + defer func() { + compressed.Reset() + compressedBufPool.Put(compressed) + }() + if r.ContentLength >= 0 { if !requestLimiter.AllowSizeBytes(tenantHTTP, r.ContentLength) { http.Error(w, "write request too large", http.StatusRequestEntityTooLarge) @@ -574,19 +600,23 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } else { compressed.Grow(512) } - _, err = io.Copy(&compressed, r.Body) + _, err = io.Copy(compressed, r.Body) if err != nil { http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError) return } - reqBuf, err := s2.Decode(nil, compressed.Bytes()) + + // Use buffer pool for decompressed data. + reqBuf := decompressedBufPool.Get().(*[]byte) + defer decompressedBufPool.Put(reqBuf) + *reqBuf, err = s2.Decode((*reqBuf)[:0], compressed.Bytes()) if err != nil { level.Error(tLogger).Log("msg", "snappy decode error", "err", err) http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest) return } - if !requestLimiter.AllowSizeBytes(tenantHTTP, int64(len(reqBuf))) { + if !requestLimiter.AllowSizeBytes(tenantHTTP, int64(len(*reqBuf))) { http.Error(w, "write request too large", http.StatusRequestEntityTooLarge) return } @@ -594,8 +624,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { // NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory // from the whole request. Ensure that we always copy those when we want to // store them for longer time. - var wreq prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &wreq); err != nil { + // Use proto message pool to reduce allocations. + wreq := writeRequestPool.Get().(*prompb.WriteRequest) + wreq.Reset() // Reset immediately after getting from pool + defer func() { + // Reset before returning to pool to avoid memory leaks. + wreq.Reset() + writeRequestPool.Put(wreq) + }() + + if err := proto.Unmarshal(*reqBuf, wreq); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -637,14 +675,24 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } // Apply relabeling configs. - h.relabel(&wreq) + h.relabel(wreq) if len(wreq.Timeseries) == 0 { level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.") return } + // Deep copy all labels to detach them from the pooled buffer. + // This is necessary because: + // 1. The pooled buffer will be returned and reused when this function returns + // 2. Async remote write goroutines may still reference the labels + // 3. Even local writes need detached labels for TSDB storage + // Use the same intern setting as the writer to ensure consistent behavior. + for i := range wreq.Timeseries { + labelpb.ReAllocZLabelsStrings(&wreq.Timeseries[i].Labels, h.writer.opts.Intern) + } + responseStatusCode := http.StatusOK - tenantStats, err := h.handleRequest(ctx, rep, tenantHTTP, &wreq) + tenantStats, err := h.handleRequest(ctx, rep, tenantHTTP, wreq) if err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error()) switch errors.Cause(err) { @@ -930,9 +978,8 @@ func (h *Handler) distributeTimeseriesToReplicas( tenantLabel := lbls.Get(h.splitTenantLabelName) if tenantLabel != "" { tenant = h.splitTenantLabelName + ":" + tenantLabel - } else { - tenant = h.options.DefaultTenantID } + // If label not found, keep using tenantHTTP (don't override) } for _, rn := range replicas { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index c938a4f0407..f5a5609b389 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -1052,8 +1052,8 @@ func TestReceiveExtractsTenant(t *testing.T) { Timeseries: []prompb.TimeSeries{ { Labels: []prompb.Label{ - {Name: tenantLabelName, Value: "tenant-1"}, {Name: "aa", Value: "bb"}, + {Name: tenantLabelName, Value: "tenant-1"}, }, Samples: []prompb.Sample{ {Value: 1, Timestamp: time.Now().UnixMilli()}, @@ -1063,7 +1063,7 @@ func TestReceiveExtractsTenant(t *testing.T) { }) })) - testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "tenant-1")), e2emon.WaitMissingMetrics())) + testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", tenantLabelName+":tenant-1")), e2emon.WaitMissingMetrics())) }) t.Run("tenant label is extracted from one series, default is used for the other one", func(t *testing.T) { @@ -1072,8 +1072,8 @@ func TestReceiveExtractsTenant(t *testing.T) { Timeseries: []prompb.TimeSeries{ { Labels: []prompb.Label{ - {Name: tenantLabelName, Value: "tenant-2"}, {Name: "aa", Value: "bb"}, + {Name: tenantLabelName, Value: "tenant-2"}, }, Samples: []prompb.Sample{ {Value: 1, Timestamp: time.Now().UnixMilli()}, @@ -1101,8 +1101,8 @@ func TestReceiveExtractsTenant(t *testing.T) { Timeseries: []prompb.TimeSeries{ { Labels: []prompb.Label{ - {Name: tenantLabelName, Value: "tenant-3"}, {Name: "aa", Value: "bb"}, + {Name: tenantLabelName, Value: "tenant-3"}, }, Samples: []prompb.Sample{ {Value: 1, Timestamp: time.Now().UnixMilli()}, @@ -1149,7 +1149,7 @@ func TestReceiveExtractsTenant(t *testing.T) { })) testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "http-tenant")), e2emon.WaitMissingMetrics())) - testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "tenant-3")), e2emon.WaitMissingMetrics())) + testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", tenantLabelName+":tenant-3")), e2emon.WaitMissingMetrics())) }) }