Skip to content

Commit b35d3e7

Browse files
committed
feat: improve batch processing by grouping files by exporter and selecting oldest for efficient sending
1 parent 38950f4 commit b35d3e7

File tree

1 file changed

+25
-69
lines changed

1 file changed

+25
-69
lines changed

internal/report/sender.go

Lines changed: 25 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"net/http"
1111
"net/url"
1212
"path/filepath"
13-
"sort"
1413
"strings"
1514
"time"
1615

@@ -149,24 +148,15 @@ func (s *Sender) drainLoop() {
149148
continue
150149
}
151150

152-
// Group files by time window (5s buckets) for efficient batching
153-
timeWindows := s.groupFilesByTimeWindow(files, 5*time.Second)
151+
// NEW APPROACH: Group files by exporter, pick oldest from each
152+
// This ensures all exporters are represented in each batch
153+
batch := s.selectOldestFromEachExporter(files, s.config.Buffer.BatchSize)
154154

155-
// Process first time window (oldest files first)
156-
if len(timeWindows) > 0 {
157-
firstWindow := timeWindows[0]
158-
159-
// Limit batch size to configured batch_size
160-
batchSize := len(firstWindow)
161-
if batchSize > s.config.Buffer.BatchSize {
162-
batchSize = s.config.Buffer.BatchSize
163-
}
164-
165-
batch := firstWindow[:batchSize]
155+
if len(batch) > 0 {
166156
if err := s.processBatch(batch); err != nil {
167157
// Failed to send - keep files and retry after delay
168158
logger.Debug("Failed to process batch, will retry",
169-
logger.Int("batch_size", batchSize),
159+
logger.Int("batch_size", len(batch)),
170160
logger.Err(err))
171161
}
172162
}
@@ -314,71 +304,37 @@ func (s *Sender) processBatch(filePaths []string) error {
314304
return nil
315305
}
316306

317-
// groupFilesByTimeWindow groups files into time buckets (e.g., 5s windows)
318-
// This allows batching multiple exporters that scraped at similar times
319-
// Returns a list of time windows (oldest first), each containing file paths
320-
func (s *Sender) groupFilesByTimeWindow(filePaths []string, windowSize time.Duration) [][]string {
321-
// Map: timestamp bucket -> file paths
322-
windows := make(map[int64][]string)
307+
// selectOldestFromEachExporter picks the oldest file from each exporter directory
308+
// This ensures all exporters are represented in each batch, preventing one exporter
309+
// from blocking others if it has a backlog
310+
func (s *Sender) selectOldestFromEachExporter(filePaths []string, maxBatch int) []string {
311+
// Group files by exporter (directory name)
312+
byExporter := make(map[string][]string)
323313

324314
for _, filePath := range filePaths {
325-
// Parse timestamp from filename: YYYYMMDD-HHMMSS-...
326-
timestamp, err := parseTimestampFromFilename(filePath)
327-
if err != nil {
328-
logger.Warn("Failed to parse timestamp from filename, skipping",
329-
logger.String("file", filePath),
330-
logger.Err(err))
331-
continue
332-
}
315+
// Extract exporter name from path: buffer/<exporter>/file.prom
316+
dir := filepath.Dir(filePath)
317+
exporterName := filepath.Base(dir)
333318

334-
// Bucket by time window (e.g., 5s buckets)
335-
bucket := timestamp.Unix() / int64(windowSize.Seconds())
336-
windows[bucket] = append(windows[bucket], filePath)
319+
byExporter[exporterName] = append(byExporter[exporterName], filePath)
337320
}
338321

339-
// Convert to sorted list of windows (oldest first)
340-
buckets := make([]int64, 0, len(windows))
341-
for bucket := range windows {
342-
buckets = append(buckets, bucket)
322+
// Pick oldest file from each exporter (files are already sorted chronologically)
323+
batch := make([]string, 0, len(byExporter))
324+
for _, files := range byExporter {
325+
if len(files) > 0 {
326+
batch = append(batch, files[0]) // First file is oldest
327+
}
343328
}
344-
sort.Slice(buckets, func(i, j int) bool {
345-
return buckets[i] < buckets[j]
346-
})
347329

348-
result := make([][]string, 0, len(buckets))
349-
for _, bucket := range buckets {
350-
result = append(result, windows[bucket])
330+
// Limit to maxBatch if needed
331+
if len(batch) > maxBatch {
332+
batch = batch[:maxBatch]
351333
}
352334

353-
return result
335+
return batch
354336
}
355337

356-
// parseTimestampFromFilename extracts timestamp from buffer filename
357-
// Format: buffer/<exporter>/YYYYMMDD-HHMMSS-<server_id>.prom
358-
func parseTimestampFromFilename(filePath string) (time.Time, error) {
359-
filename := filepath.Base(filePath)
360-
361-
// Remove .prom extension
362-
if !strings.HasSuffix(filename, ".prom") {
363-
return time.Time{}, fmt.Errorf("invalid file extension")
364-
}
365-
366-
// Extract timestamp part (first two segments: YYYYMMDD-HHMMSS)
367-
parts := strings.SplitN(strings.TrimSuffix(filename, ".prom"), "-", 3)
368-
if len(parts) < 2 {
369-
return time.Time{}, fmt.Errorf("invalid filename format")
370-
}
371-
372-
timeStr := parts[0] + "-" + parts[1]
373-
374-
// Parse timestamp
375-
timestamp, err := time.Parse("20060102-150405", timeStr)
376-
if err != nil {
377-
return time.Time{}, fmt.Errorf("failed to parse timestamp: %w", err)
378-
}
379-
380-
return timestamp, nil
381-
}
382338

383339
// randomDelay waits for a random duration between 0 and the configured interval
384340
// This distributes load across the interval window

0 commit comments

Comments
 (0)