From fb152c7de7d18edcb52c26f070de6c4e36e14688 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Tue, 27 Jan 2026 12:36:01 -0800 Subject: [PATCH] heap optimizaton for pantheon db Signed-off-by: Yi Jin --- pkg/receive/handler.go | 54 +++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b2fdd38217..7a38301feb 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -87,6 +87,26 @@ var ( errNotReady = errors.New("target not ready") errUnavailable = errors.New("target not available") errInternal = errors.New("internal error") + + // Buffer pools to reduce memory allocations in hot path. + compressedBufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 32*1024)) // 32KB default + }, + } + + decompressedBufPool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 0, 128*1024) // 128KB default + return &buf + }, + } + + writeRequestPool = sync.Pool{ + New: func() interface{} { + return &prompb.WriteRequest{} + }, + } ) type WriteableStoreAsyncClient interface { @@ -564,7 +584,13 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { requestLimiter := h.Limiter.RequestLimiter() // io.ReadAll dynamically adjust the byte slice for read data, starting from 512B. // Since this is receive hot path, grow upfront saving allocations and CPU time. - compressed := bytes.Buffer{} + // Use buffer pool to reduce allocations. + compressed := compressedBufPool.Get().(*bytes.Buffer) + defer func() { + compressed.Reset() + compressedBufPool.Put(compressed) + }() + if r.ContentLength >= 0 { if !requestLimiter.AllowSizeBytes(tenantHTTP, r.ContentLength) { http.Error(w, "write request too large", http.StatusRequestEntityTooLarge) @@ -574,19 +600,23 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } else { compressed.Grow(512) } - _, err = io.Copy(&compressed, r.Body) + _, err = io.Copy(compressed, r.Body) if err != nil { http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError) return } - reqBuf, err := s2.Decode(nil, compressed.Bytes()) + + // Use buffer pool for decompressed data. + reqBuf := decompressedBufPool.Get().(*[]byte) + defer decompressedBufPool.Put(reqBuf) + *reqBuf, err = s2.Decode((*reqBuf)[:0], compressed.Bytes()) if err != nil { level.Error(tLogger).Log("msg", "snappy decode error", "err", err) http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest) return } - if !requestLimiter.AllowSizeBytes(tenantHTTP, int64(len(reqBuf))) { + if !requestLimiter.AllowSizeBytes(tenantHTTP, int64(len(*reqBuf))) { http.Error(w, "write request too large", http.StatusRequestEntityTooLarge) return } @@ -594,8 +624,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { // NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory // from the whole request. Ensure that we always copy those when we want to // store them for longer time. - var wreq prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &wreq); err != nil { + // Use proto message pool to reduce allocations. + wreq := writeRequestPool.Get().(*prompb.WriteRequest) + defer func() { + // Reset before returning to pool to avoid memory leaks. + wreq.Timeseries = wreq.Timeseries[:0] + wreq.Metadata = wreq.Metadata[:0] + writeRequestPool.Put(wreq) + }() + + if err := proto.Unmarshal(*reqBuf, wreq); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -637,14 +675,14 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } // Apply relabeling configs. - h.relabel(&wreq) + h.relabel(wreq) if len(wreq.Timeseries) == 0 { level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.") return } responseStatusCode := http.StatusOK - tenantStats, err := h.handleRequest(ctx, rep, tenantHTTP, &wreq) + tenantStats, err := h.handleRequest(ctx, rep, tenantHTTP, wreq) if err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error()) switch errors.Cause(err) {