From 0884479691721e8b37db6cb0eba2ef37bf13f0fc Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 30 Jun 2023 12:19:57 +0100 Subject: [PATCH 01/17] Pulll tempo sync --- pkg/phlaredb/block_querier.go | 2 +- pkg/phlaredb/query/iters.go | 635 +++++++++++++++++++++++++++++++++- 2 files changed, 633 insertions(+), 4 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 989e95a1a..ed0e8346c 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -1206,7 +1206,7 @@ func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string, return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath())) } ctx = query.AddMetricsToContext(ctx, r.metrics.query) - return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias) + return query.NewSyncIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias) } func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] { diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 28b2264cc..72e1f3f42 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -76,7 +76,7 @@ func TruncateRowNumber(t RowNumberWithDefinitionLevel) RowNumber { return n } -func (t RowNumber) Valid() bool { +func (t *RowNumber) Valid() bool { return t[0] >= 0 } @@ -96,7 +96,194 @@ func (t RowNumber) Valid() bool { // gb | 1 | 3 | { 0, 2, 0, 0 } // null | 0 | 1 | { 1, 0, -1, -1 } func (t *RowNumber) Next(repetitionLevel, definitionLevel int) { - // Next row at this level + t[repetitionLevel]++ + + // the following is nextSlow() unrolled + switch repetitionLevel { + case 0: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[1] = 0 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[1] = 0 + t[2] = 0 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[1] = 0 + t[2] = 0 + t[3] = 0 + t[4] = -1 + t[5] = -1 + case 4: + t[1] = 0 + t[2] = 0 + t[3] = 0 + t[4] = 0 + t[5] = -1 + case 5: + t[1] = 0 + t[2] = 0 + t[3] = 0 + t[4] = 0 + t[5] = 0 + } + case 1: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[2] = 0 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[2] = 0 + t[3] = 0 + t[4] = -1 + t[5] = -1 + case 4: + t[2] = 0 + t[3] = 0 + t[4] = 0 + t[5] = -1 + case 5: + t[2] = 0 + t[3] = 0 + t[4] = 0 + t[5] = 0 + } + case 2: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[3] = 0 + t[4] = -1 + t[5] = -1 + case 4: + t[3] = 0 + t[4] = 0 + t[5] = -1 + case 5: + t[3] = 0 + t[4] = 0 + t[5] = 0 + } + case 3: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[4] = -1 + t[5] = -1 + case 4: + t[4] = 0 + t[5] = -1 + case 5: + t[4] = 0 + t[5] = 0 + } + case 4: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[4] = -1 + t[5] = -1 + case 4: + t[5] = -1 + case 5: + t[5] = 0 + } + case 5: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[4] = -1 + t[5] = -1 + case 4: + t[5] = -1 + } + } +} + +// nextSlow is the original implementation of next. it is kept to test against +// the unrolled version above +func (t *RowNumber) nextSlow(repetitionLevel, definitionLevel int) { t[repetitionLevel]++ // New children up through the definition level @@ -104,7 +291,7 @@ func (t *RowNumber) Next(repetitionLevel, definitionLevel int) { t[i] = 0 } - // Children past the definition level are undefined + // // Children past the definition level are undefined for i := definitionLevel + 1; i < len(t); i++ { t[i] = -1 } @@ -118,6 +305,26 @@ func (t *RowNumber) Skip(numRows int64) { } } +// Preceding returns the largest representable row number that is immediately prior to this +// one. Think of it like math.NextAfter but for segmented row numbers. Examples: +// +// RowNumber 1000.0.0 (defined at 3 levels) is preceded by 999.max.max +// RowNumber 1000.-1.-1 (defined at 1 level) is preceded by 999.-1.-1 +func (t RowNumber) Preceding() RowNumber { + for i := len(t) - 1; i >= 0; i-- { + switch t[i] { + case -1: + continue + case 0: + t[i] = math.MaxInt64 + default: + t[i]-- + return t + } + } + return t +} + // IteratorResult is a row of data with a row number and named columns of data. // Internally it has an unstructured list for efficient collection. The ToMap() // function can be used to make inspection easier. @@ -941,3 +1148,425 @@ func (r *RowNumberIterator[T]) Seek(to RowNumberWithDefinitionLevel) bool { } return true } + +// SyncIterator is like ColumnIterator but synchronous. It scans through the given row +// groups and column, and applies the optional predicate to each chunk, page, and value. +// Results are read by calling Next() until it returns nil. +type SyncIterator struct { + // Config + column int + columnName string + rgs []parquet.RowGroup + rgsMin []RowNumber + rgsMax []RowNumber // Exclusive, row number of next one past the row group + readSize int + selectAs string + filter *InstrumentedPredicate + + // Status + span opentracing.Span + curr RowNumber + currRowGroup parquet.RowGroup + currRowGroupMin RowNumber + currRowGroupMax RowNumber + currChunk parquet.ColumnChunk + currPages parquet.Pages + currPage parquet.Page + currPageMax RowNumber + currValues parquet.ValueReader + currBuf []parquet.Value + currBufN int + + err error + res *IteratorResult +} + +var _ Iterator = (*SyncIterator)(nil) + +var syncIteratorPool = sync.Pool{ + New: func() interface{} { + return []parquet.Value{} + }, +} + +func syncIteratorPoolGet(capacity, len int) []parquet.Value { + res := syncIteratorPool.Get().([]parquet.Value) + if cap(res) < capacity { + res = make([]parquet.Value, capacity) + } + res = res[:len] + return res +} + +func syncIteratorPoolPut(b []parquet.Value) { + for i := range b { + b[i] = parquet.Value{} + } + syncIteratorPool.Put(b) // nolint: staticcheck +} + +func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *SyncIterator { + + // Assign row group bounds. + // Lower bound is inclusive + // Upper bound is exclusive, points at the first row of the next group + rn := EmptyRowNumber() + rgsMin := make([]RowNumber, len(rgs)) + rgsMax := make([]RowNumber, len(rgs)) + for i, rg := range rgs { + rgsMin[i] = rn + rgsMax[i] = rn + rgsMax[i].Skip(rg.NumRows() + 1) + rn.Skip(rg.NumRows()) + } + + span, _ := opentracing.StartSpanFromContext(ctx, "syncIterator", opentracing.Tags{ + "columnIndex": column, + "column": columnName, + }) + + return &SyncIterator{ + span: span, + column: column, + columnName: columnName, + rgs: rgs, + readSize: readSize, + selectAs: selectAs, + rgsMin: rgsMin, + rgsMax: rgsMax, + filter: &InstrumentedPredicate{pred: filter}, + curr: EmptyRowNumber(), + } +} + +func (c *SyncIterator) At() *IteratorResult { + return c.res +} + +func (c *SyncIterator) Next() bool { + rn, v, err := c.next() + if err != nil { + c.res = nil + c.err = err + return false + } + if !rn.Valid() { + c.res = nil + c.err = nil + return false + } + c.res = c.makeResult(rn, v) + return true +} + +// SeekTo moves this iterator to the next result that is greater than +// or equal to the given row number (and based on the given definition level) +func (c *SyncIterator) Seek(to RowNumberWithDefinitionLevel) bool { + + if c.seekRowGroup(to.RowNumber, to.DefinitionLevel) { + c.res = nil + c.err = nil + return false + } + + done, err := c.seekPages(to.RowNumber, to.DefinitionLevel) + if err != nil { + c.res = nil + c.err = err + return false + } + if done { + c.res = nil + c.err = nil + return false + } + + // The row group and page have been selected to where this value is possibly + // located. Now scan through the page and look for it. + for { + rn, v, err := c.next() + if err != nil { + c.res = nil + c.err = err + return false + } + if !rn.Valid() { + c.res = nil + c.err = nil + return false + } + + if CompareRowNumbers(to.DefinitionLevel, rn, to.RowNumber) >= 0 { + c.res = c.makeResult(rn, v) + c.err = nil + return true + } + } +} + +func (c *SyncIterator) popRowGroup() (parquet.RowGroup, RowNumber, RowNumber) { + if len(c.rgs) == 0 { + return nil, EmptyRowNumber(), EmptyRowNumber() + } + + rg := c.rgs[0] + min := c.rgsMin[0] + max := c.rgsMax[0] + + c.rgs = c.rgs[1:] + c.rgsMin = c.rgsMin[1:] + c.rgsMax = c.rgsMax[1:] + + return rg, min, max +} + +// seekRowGroup skips ahead to the row group that could contain the value at the +// desired row number. Does nothing if the current row group is already the correct one. +func (c *SyncIterator) seekRowGroup(seekTo RowNumber, definitionLevel int) (done bool) { + if c.currRowGroup != nil && CompareRowNumbers(definitionLevel, seekTo, c.currRowGroupMax) >= 0 { + // Done with this row group + c.closeCurrRowGroup() + } + + for c.currRowGroup == nil { + + rg, min, max := c.popRowGroup() + if rg == nil { + return true + } + + if CompareRowNumbers(definitionLevel, seekTo, max) != -1 { + continue + } + + cc := rg.ColumnChunks()[c.column] + if c.filter != nil && !c.filter.KeepColumnChunk(cc) { + continue + } + + // This row group matches both row number and filter. + c.setRowGroup(rg, min, max) + } + + return c.currRowGroup == nil +} + +// seekPages skips ahead in the current row group to the page that could contain the value at +// the desired row number. Does nothing if the current page is already the correct one. +func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bool, err error) { + if c.currPage != nil && CompareRowNumbers(definitionLevel, seekTo, c.currPageMax) >= 0 { + // Value not in this page + c.setPage(nil) + } + + if c.currPage == nil { + + // TODO (mdisibio) :(((((((( + // pages.SeekToRow is more costly than expected. It doesn't reuse existing i/o + // so it can't be called naively every time we swap pages. We need to figure out + // a way to determine when it is worth calling here. + /* + // Seek into the pages. This is relative to the start of the row group + if seekTo[0] > 0 { + // Determine row delta. We subtract 1 because curr points at the previous row + skip := seekTo[0] - c.currRowGroupMin[0] - 1 + if skip > 0 { + if err := c.currPages.SeekToRow(skip); err != nil { + return true, err + } + c.curr.Skip(skip) + } + }*/ + + for c.currPage == nil { + pg, err := c.currPages.ReadPage() + if pg == nil || err != nil { + // No more pages in this column chunk, + // cleanup and exit. + if err == io.EOF { + err = nil + } + parquet.Release(pg) + c.closeCurrRowGroup() + return true, err + } + + // Skip based on row number? + newRN := c.curr + newRN.Skip(pg.NumRows() + 1) + if CompareRowNumbers(definitionLevel, seekTo, newRN) >= 0 { + c.curr.Skip(pg.NumRows()) + parquet.Release(pg) + continue + } + + // Skip based on filter? + if c.filter != nil && !c.filter.KeepPage(pg) { + c.curr.Skip(pg.NumRows()) + parquet.Release(pg) + continue + } + + c.setPage(pg) + } + } + + return false, nil +} + +// next is the core functionality of this iterator and returns the next matching result. This +// may involve inspecting multiple row groups, pages, and values until a match is found. When +// we run out of things to inspect, it returns nil. The reason this method is distinct from +// Next() is because it doesn't wrap the results in an IteratorResult, which is more efficient +// when being called multiple times and throwing away the results like in SeekTo(). +func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) { + for { + if c.currRowGroup == nil { + rg, min, max := c.popRowGroup() + if rg == nil { + return EmptyRowNumber(), nil, nil + } + + cc := rg.ColumnChunks()[c.column] + if c.filter != nil && !c.filter.KeepColumnChunk(cc) { + continue + } + + c.setRowGroup(rg, min, max) + } + + if c.currPage == nil { + pg, err := c.currPages.ReadPage() + if pg == nil || err == io.EOF { + // This row group is exhausted + c.closeCurrRowGroup() + continue + } + if err != nil { + return EmptyRowNumber(), nil, err + } + if c.filter != nil && !c.filter.KeepPage(pg) { + // This page filtered out + c.curr.Skip(pg.NumRows()) + parquet.Release(pg) + continue + } + c.setPage(pg) + } + + // Read next batch of values if needed + if c.currBuf == nil { + c.currBuf = syncIteratorPoolGet(c.readSize, 0) + } + if c.currBufN >= len(c.currBuf) || len(c.currBuf) == 0 { + c.currBuf = c.currBuf[:cap(c.currBuf)] + n, err := c.currValues.ReadValues(c.currBuf) + if err != nil && err != io.EOF { + return EmptyRowNumber(), nil, err + } + c.currBuf = c.currBuf[:n] + c.currBufN = 0 + if n == 0 { + // This value reader and page are exhausted. + c.setPage(nil) + continue + } + } + + // Consume current buffer until empty + for c.currBufN < len(c.currBuf) { + v := &c.currBuf[c.currBufN] + + // Inspect all values to track the current row number, + // even if the value is filtered out next. + c.curr.Next(v.RepetitionLevel(), v.DefinitionLevel()) + c.currBufN++ + + if c.filter != nil && !c.filter.KeepValue(*v) { + continue + } + + return c.curr, v, nil + } + } +} + +func (c *SyncIterator) setRowGroup(rg parquet.RowGroup, min, max RowNumber) { + c.closeCurrRowGroup() + c.curr = min + c.currRowGroup = rg + c.currRowGroupMin = min + c.currRowGroupMax = max + c.currChunk = rg.ColumnChunks()[c.column] + c.currPages = c.currChunk.Pages() +} + +func (c *SyncIterator) setPage(pg parquet.Page) { + + // Handle an outgoing page + if c.currPage != nil { + c.curr = c.currPageMax.Preceding() // Reposition current row number to end of this page. + parquet.Release(c.currPage) + c.currPage = nil + } + + // Reset value buffers + c.currValues = nil + c.currPageMax = EmptyRowNumber() + c.currBufN = 0 + + // If we don't immediately have a new incoming page + // then return the buffer to the pool. + if pg == nil && c.currBuf != nil { + syncIteratorPoolPut(c.currBuf) + c.currBuf = nil + } + + // Handle an incoming page + if pg != nil { + rn := c.curr + rn.Skip(pg.NumRows() + 1) // Exclusive upper bound, points at the first rownumber in the next page + c.currPage = pg + c.currPageMax = rn + c.currValues = pg.Values() + } +} + +func (c *SyncIterator) closeCurrRowGroup() { + if c.currPages != nil { + c.currPages.Close() + } + + c.currRowGroup = nil + c.currRowGroupMin = EmptyRowNumber() + c.currRowGroupMax = EmptyRowNumber() + c.currChunk = nil + c.currPages = nil + c.setPage(nil) +} + +func (c *SyncIterator) makeResult(t RowNumber, v *parquet.Value) *IteratorResult { + r := columnIteratorResultPoolGet() + r.RowNumber = t + if c.selectAs != "" { + r.AppendValue(c.selectAs, v.Clone()) + } + return r +} + +func (c *SyncIterator) Err() error { + return c.err +} + +func (c *SyncIterator) Close() error { + c.closeCurrRowGroup() + + c.span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks) + c.span.SetTag("inspectedPages", c.filter.InspectedPages) + c.span.SetTag("inspectedValues", c.filter.InspectedValues) + c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks) + c.span.SetTag("keptPages", c.filter.KeptPages) + c.span.SetTag("keptValues", c.filter.KeptValues) + c.span.Finish() + return nil +} From 664563b77a12a48790255eeb608e03c06df48164 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 30 Jun 2023 16:54:50 +0100 Subject: [PATCH 02/17] BinaryJoinIterator --- pkg/phlaredb/block_querier_test.go | 1 - pkg/phlaredb/profile_store.go | 2 +- pkg/phlaredb/query/iters.go | 287 +++++---------------------- pkg/phlaredb/query/iters_test.go | 4 +- pkg/phlaredb/query/predicate_test.go | 2 +- 5 files changed, 57 insertions(+), 239 deletions(-) diff --git a/pkg/phlaredb/block_querier_test.go b/pkg/phlaredb/block_querier_test.go index 828a8411e..495107d00 100644 --- a/pkg/phlaredb/block_querier_test.go +++ b/pkg/phlaredb/block_querier_test.go @@ -190,5 +190,4 @@ func TestBlockCompatability(t *testing.T) { }) } - } diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index 5d21f7590..79e44467a 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -498,7 +498,7 @@ func (r *rowGroupOnDisk) columnIter(ctx context.Context, columnName string, pred if !found { return query.NewErrIterator(fmt.Errorf("column '%s' not found in head row group segment '%s'", columnName, r.file.Name())) } - return query.NewColumnIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias) + return query.NewSyncIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias) } type seriesIDRowsRewriter struct { diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 72e1f3f42..c3d23c774 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -6,13 +6,11 @@ import ( "fmt" "io" "math" - "strings" "sync" "sync/atomic" "github.com/grafana/dskit/multierror" "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/log" "github.com/segmentio/parquet-go" "github.com/grafana/phlare/pkg/iter" @@ -466,265 +464,86 @@ type ColumnIterator struct { err error } -var _ Iterator = (*ColumnIterator)(nil) - type columnIteratorBuffer struct { rowNumbers []RowNumber values []parquet.Value err error } -func NewColumnIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *ColumnIterator { - c := &ColumnIterator{ - metrics: getMetricsFromContext(ctx), - table: strings.ToLower(rgs[0].Schema().Name()) + "s", - rgs: rgs, - col: column, - colName: columnName, - filter: &InstrumentedPredicate{pred: filter}, - selectAs: selectAs, - quit: make(chan struct{}), - ch: make(chan *columnIteratorBuffer, 1), - currN: -1, - } +type BinaryJoinIterator struct { + left Iterator + right Iterator + definitionLevel int - go c.iterate(ctx, readSize) - return c + err error + res *IteratorResult } -func (c *ColumnIterator) iterate(ctx context.Context, readSize int) { - defer close(c.ch) - - span, _ := opentracing.StartSpanFromContext(ctx, "columnIterator.iterate", opentracing.Tags{ - "columnIndex": c.col, - "column": c.colName, - }) - defer func() { - span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load()) - span.SetTag("inspectedPages", c.filter.InspectedPages.Load()) - span.SetTag("inspectedValues", c.filter.InspectedValues.Load()) - span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load()) - span.SetTag("keptPages", c.filter.KeptPages.Load()) - span.SetTag("keptValues", c.filter.KeptValues.Load()) - span.Finish() - }() - - rn := EmptyRowNumber() - buffer := make([]parquet.Value, readSize) - - checkSkip := func(numRows int64) bool { - seekTo := c.seekTo.Load() - if seekTo == nil { - return false - } - - seekToRN := seekTo.(RowNumber) - - rnNext := rn - rnNext.Skip(numRows) +var _ Iterator = (*BinaryJoinIterator)(nil) - return CompareRowNumbers(0, rnNext, seekToRN) == -1 +func NewBinaryJoinIterator(definitionLevel int, left, right Iterator) *BinaryJoinIterator { + return &BinaryJoinIterator{ + left: left, + right: right, + definitionLevel: definitionLevel, } +} - for _, rg := range c.rgs { - col := rg.ColumnChunks()[c.col] - - if checkSkip(rg.NumRows()) { - // Skip column chunk - rn.Skip(rg.NumRows()) - continue +func (bj *BinaryJoinIterator) Next() bool { + for { + if !bj.left.Next() { + bj.err = bj.left.Err() + return false } + resLeft := bj.left.At() - if c.filter != nil { - if !c.filter.KeepColumnChunk(col) { - // Skip column chunk - rn.Skip(rg.NumRows()) - continue - } + // now seek the right iterator to the left position + if !bj.right.Seek(RowNumberWithDefinitionLevel{resLeft.RowNumber, bj.definitionLevel}) { + bj.err = bj.right.Err() + return false } - - func(col parquet.ColumnChunk) { - pgs := col.Pages() - defer func() { - if err := pgs.Close(); err != nil { - span.LogKV("closing error", err) - } - }() - for { - pg, err := pgs.ReadPage() - if pg == nil || err == io.EOF { - break - } - c.metrics.pageReadsTotal.WithLabelValues(c.table, c.colName).Add(1) - span.LogFields( - log.String("msg", "reading page"), - log.Int64("page_num_values", pg.NumValues()), - log.Int64("page_size", pg.Size()), - ) - if err != nil { - return - } - - if checkSkip(pg.NumRows()) { - // Skip page - rn.Skip(pg.NumRows()) - continue - } - - if c.filter != nil { - if !c.filter.KeepPage(pg) { - // Skip page - rn.Skip(pg.NumRows()) - continue - } - } - - vr := pg.Values() - for { - count, err := vr.ReadValues(buffer) - if count > 0 { - - // Assign row numbers, filter values, and collect the results. - newBuffer := columnIteratorPoolGet(readSize, 0) - - for i := 0; i < count; i++ { - - v := buffer[i] - - // We have to do this for all values (even if the - // value is excluded by the predicate) - rn.Next(v.RepetitionLevel(), v.DefinitionLevel()) - - if c.filter != nil { - if !c.filter.KeepValue(v) { - continue - } - } - - newBuffer.rowNumbers = append(newBuffer.rowNumbers, rn) - newBuffer.values = append(newBuffer.values, v) - } - - if len(newBuffer.rowNumbers) > 0 { - select { - case c.ch <- newBuffer: - case <-c.quit: - return - case <-ctx.Done(): - return - } - } else { - // All values excluded, we go ahead and immediately - // return the buffer to the pool. - columnIteratorPoolPut(newBuffer) - } - } - - // Error checks MUST occur after processing any returned data - // following io.Reader behavior. - if err == io.EOF { - break - } - if err != nil { - c.ch <- &columnIteratorBuffer{err: err} - return - } - } - - } - }(col) - } -} - -// At returns the current value from the iterator. -func (c *ColumnIterator) At() *IteratorResult { - return c.result -} - -// Next returns the next matching value from the iterator. -// Returns nil when finished. -func (c *ColumnIterator) Next() bool { - t, v := c.next() - if t.Valid() { - c.result = c.makeResult(t, v) - return true - } - - c.result = nil - return false -} - -func (c *ColumnIterator) next() (RowNumber, parquet.Value) { - // Consume current buffer until exhausted - // then read another one from the channel. - if c.curr != nil { - for c.currN++; c.currN < len(c.curr.rowNumbers); { - t := c.curr.rowNumbers[c.currN] - if t.Valid() { - return t, c.curr.values[c.currN] + resRight := bj.right.At() + + if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 { + // we have a found an element + bj.res = columnIteratorResultPoolGet() + bj.res.RowNumber = resLeft.RowNumber + bj.res.Append(resLeft) + bj.res.Append(resRight) + // columnIteratorResultPoolPut(resLeft) + // columnIteratorResultPoolPut(resRight) + return true + } else if cmp < 0 { + if !bj.left.Seek(RowNumberWithDefinitionLevel{resRight.RowNumber, bj.definitionLevel}) { + bj.err = bj.left.Err() + return false } - } - // Done with this buffer - columnIteratorPoolPut(c.curr) - c.curr = nil - } - - if v, ok := <-c.ch; ok { - if v.err != nil { - c.err = v.err - return EmptyRowNumber(), parquet.Value{} + } else { + // the right value can't be smaller than the left one because we seeked beyond it + panic("not expected to happen") } - // Got next buffer, guaranteed to have at least 1 element - c.curr = v - c.currN = 0 - return c.curr.rowNumbers[0], c.curr.values[0] } - - // Failed to read from the channel, means iterator is exhausted. - return EmptyRowNumber(), parquet.Value{} } -// SeekTo moves this iterator to the next result that is greater than -// or equal to the given row number (and based on the given definition level) -func (c *ColumnIterator) Seek(to RowNumberWithDefinitionLevel) bool { - var at RowNumber - var v parquet.Value - - // Because iteration happens in the background, we signal the row - // to skip to, and then read until we are at the right spot. The - // seek is best-effort and may have no effect if the iteration - // already further ahead, and there may already be older data - // in the buffer. - c.seekTo.Store(to.RowNumber) - for at, v = c.next(); at.Valid() && CompareRowNumbers(to.DefinitionLevel, at, to.RowNumber) < 0; { - at, v = c.next() - } - - if at.Valid() { - c.result = c.makeResult(at, v) - return true - } - - c.result = nil - return false +func (bj *BinaryJoinIterator) At() *IteratorResult { + return bj.res } -func (c *ColumnIterator) makeResult(t RowNumber, v parquet.Value) *IteratorResult { - r := columnIteratorResultPoolGet() - r.RowNumber = t - if c.selectAs != "" { - r.AppendValue(c.selectAs, v) - } - return r +func (bj *BinaryJoinIterator) Seek(to RowNumberWithDefinitionLevel) bool { + bj.left.Seek(to) + bj.right.Seek(to) + return bj.Next() } -func (c *ColumnIterator) Close() error { - close(c.quit) - return nil +func (bj *BinaryJoinIterator) Close() error { + var merr multierror.MultiError + merr.Add(bj.left.Close()) + merr.Add(bj.right.Close()) + return merr.Err() } -func (c *ColumnIterator) Err() error { +func (c *BinaryJoinIterator) Err() error { return c.err } diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go index ba7828d85..2211fe5fe 100644 --- a/pkg/phlaredb/query/iters_test.go +++ b/pkg/phlaredb/query/iters_test.go @@ -93,7 +93,7 @@ func newTestSet() []parquet.RowGroup { } } -func TestColumnIterator(t *testing.T) { +func TestSyncIterator(t *testing.T) { for _, tc := range []struct { name string result []parquet.Value @@ -121,7 +121,7 @@ func TestColumnIterator(t *testing.T) { buffer [][]parquet.Value ctx = context.Background() - i = NewColumnIterator(ctx, tc.rowGroups, 0, "id", 10, nil, "id") + i = NewSyncIterator(ctx, tc.rowGroups, 0, "id", 10, nil, "id") ) for i.Next() { require.Nil(t, i.Err()) diff --git a/pkg/phlaredb/query/predicate_test.go b/pkg/phlaredb/query/predicate_test.go index 798d84a96..734041f87 100644 --- a/pkg/phlaredb/query/predicate_test.go +++ b/pkg/phlaredb/query/predicate_test.go @@ -75,7 +75,7 @@ func testPredicate[T any](t *testing.T, tc predicateTestCase[T]) { p := InstrumentedPredicate{pred: tc.predicate} - i := NewColumnIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "") + i := NewSyncIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "") for i.Next() { } From 208bdc0b05100b6b0c0ee6f506dc500d963ef8cb Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 6 Jul 2023 10:06:46 +0100 Subject: [PATCH 03/17] Remove unused code --- pkg/phlaredb/query/iters.go | 112 ++++++++---------------------------- 1 file changed, 25 insertions(+), 87 deletions(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index c3d23c774..64c3847ee 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -6,11 +6,12 @@ import ( "fmt" "io" "math" + "strings" "sync" - "sync/atomic" "github.com/grafana/dskit/multierror" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" "github.com/segmentio/parquet-go" "github.com/grafana/phlare/pkg/iter" @@ -279,22 +280,6 @@ func (t *RowNumber) Next(repetitionLevel, definitionLevel int) { } } -// nextSlow is the original implementation of next. it is kept to test against -// the unrolled version above -func (t *RowNumber) nextSlow(repetitionLevel, definitionLevel int) { - t[repetitionLevel]++ - - // New children up through the definition level - for i := repetitionLevel + 1; i <= definitionLevel; i++ { - t[i] = 0 - } - - // // Children past the definition level are undefined - for i := definitionLevel + 1; i < len(t); i++ { - t[i] = -1 - } -} - // Skip rows at the root-level. func (t *RowNumber) Skip(numRows int64) { t[0] += numRows @@ -391,34 +376,7 @@ func NewErrIterator(err error) Iterator { return iter.NewErrSeekIterator[*IteratorResult, RowNumberWithDefinitionLevel](err) } -var columnIteratorPool = sync.Pool{ - New: func() interface{} { - return &columnIteratorBuffer{} - }, -} - -func columnIteratorPoolGet(capacity, len int) *columnIteratorBuffer { - res := columnIteratorPool.Get().(*columnIteratorBuffer) - if cap(res.rowNumbers) < capacity { - res.rowNumbers = make([]RowNumber, capacity) - } - if cap(res.values) < capacity { - res.values = make([]parquet.Value, capacity) - } - res.rowNumbers = res.rowNumbers[:len] - res.values = res.values[:len] - return res -} - -func columnIteratorPoolPut(b *columnIteratorBuffer) { - b.values = b.values[:cap(b.values)] - for i := range b.values { - b.values[i] = parquet.Value{} - } - columnIteratorPool.Put(b) -} - -var columnIteratorResultPool = sync.Pool{ +var iteratorResultPool = sync.Pool{ New: func() interface{} { return &IteratorResult{Entries: make([]struct { k string @@ -428,48 +386,18 @@ var columnIteratorResultPool = sync.Pool{ }, } -func columnIteratorResultPoolGet() *IteratorResult { - res := columnIteratorResultPool.Get().(*IteratorResult) +func iteratorResultPoolGet() *IteratorResult { + res := iteratorResultPool.Get().(*IteratorResult) return res } -func columnIteratorResultPoolPut(r *IteratorResult) { +func iteratorResultPoolPut(r *IteratorResult) { if r != nil { r.Reset() - columnIteratorResultPool.Put(r) + iteratorResultPool.Put(r) } } -// ColumnIterator asynchronously iterates through the given row groups and column. Applies -// the optional predicate to each chunk, page, and value. Results are read by calling -// Next() until it returns nil. -type ColumnIterator struct { - rgs []parquet.RowGroup - col int - colName string - filter *InstrumentedPredicate - - selectAs string - seekTo atomic.Value - - metrics *Metrics - table string - quit chan struct{} - ch chan *columnIteratorBuffer - - curr *columnIteratorBuffer - currN int - - result *IteratorResult - err error -} - -type columnIteratorBuffer struct { - rowNumbers []RowNumber - values []parquet.Value - err error -} - type BinaryJoinIterator struct { left Iterator right Iterator @@ -506,7 +434,7 @@ func (bj *BinaryJoinIterator) Next() bool { if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 { // we have a found an element - bj.res = columnIteratorResultPoolGet() + bj.res = iteratorResultPoolGet() bj.res.RowNumber = resLeft.RowNumber bj.res.Append(resLeft) bj.res.Append(resRight) @@ -644,7 +572,7 @@ func (j *JoinIterator) seekAll(to RowNumberWithDefinitionLevel) { to.RowNumber = TruncateRowNumber(to) for iterNum, iter := range j.iters { if j.peeks[iterNum] == nil || CompareRowNumbers(to.DefinitionLevel, j.peeks[iterNum].RowNumber, to.RowNumber) == -1 { - columnIteratorResultPoolPut(j.peeks[iterNum]) + iteratorResultPoolPut(j.peeks[iterNum]) if iter.Seek(to) { j.peeks[iterNum] = iter.At() } else { @@ -667,7 +595,7 @@ func (j *JoinIterator) peek(iterNum int) *IteratorResult { // the next row (according to the configured definition level) // or are exhausted. func (j *JoinIterator) collect(rowNumber RowNumber) *IteratorResult { - result := columnIteratorResultPoolGet() + result := iteratorResultPoolGet() result.RowNumber = rowNumber for i := range j.iters { @@ -675,7 +603,7 @@ func (j *JoinIterator) collect(rowNumber RowNumber) *IteratorResult { result.Append(j.peeks[i]) - columnIteratorResultPoolPut(j.peeks[i]) + iteratorResultPoolPut(j.peeks[i]) if j.iters[i].Next() { j.peeks[i] = j.iters[i].At() @@ -809,7 +737,7 @@ func (u *UnionIterator) peek(iterNum int) *IteratorResult { // the next row (according to the configured definition level) // or are exhausted. func (u *UnionIterator) collect(iterNums []int, rowNumber RowNumber) *IteratorResult { - result := columnIteratorResultPoolGet() + result := iteratorResultPoolGet() result.RowNumber = rowNumber for _, iterNum := range iterNums { @@ -817,7 +745,7 @@ func (u *UnionIterator) collect(iterNums []int, rowNumber RowNumber) *IteratorRe result.Append(u.peeks[iterNum]) - columnIteratorResultPoolPut(u.peeks[iterNum]) + iteratorResultPoolPut(u.peeks[iterNum]) if u.iters[iterNum].Next() { u.peeks[iterNum] = u.iters[iterNum].At() @@ -928,7 +856,7 @@ func (r *RowNumberIterator[T]) Next() bool { if !r.Iterator.Next() { return false } - r.current = columnIteratorResultPoolGet() + r.current = iteratorResultPoolGet() r.current.Reset() rowGetter, ok := any(r.Iterator.At()).(RowGetter) if !ok { @@ -975,6 +903,7 @@ type SyncIterator struct { // Config column int columnName string + table string rgs []parquet.RowGroup rgsMin []RowNumber rgsMax []RowNumber // Exclusive, row number of next one past the row group @@ -984,6 +913,7 @@ type SyncIterator struct { // Status span opentracing.Span + metrics *Metrics curr RowNumber currRowGroup parquet.RowGroup currRowGroupMin RowNumber @@ -1045,6 +975,8 @@ func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, co }) return &SyncIterator{ + table: strings.ToLower(rgs[0].Schema().Name()) + "s", + metrics: getMetricsFromContext(ctx), span: span, column: column, columnName: columnName, @@ -1209,6 +1141,12 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo c.closeCurrRowGroup() return true, err } + c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1) + c.span.LogFields( + log.String("msg", "reading page"), + log.Int64("page_num_values", pg.NumValues()), + log.Int64("page_size", pg.Size()), + ) // Skip based on row number? newRN := c.curr @@ -1365,7 +1303,7 @@ func (c *SyncIterator) closeCurrRowGroup() { } func (c *SyncIterator) makeResult(t RowNumber, v *parquet.Value) *IteratorResult { - r := columnIteratorResultPoolGet() + r := iteratorResultPoolGet() r.RowNumber = t if c.selectAs != "" { r.AppendValue(c.selectAs, v.Clone()) From 7d21d11ec237367c1d0f0dcf576b771f84a49342 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 6 Jul 2023 11:57:05 +0100 Subject: [PATCH 04/17] Make use of the BinaryJoinIterator and remove JoinIterator --- pkg/phlaredb/block_querier.go | 22 +++--- pkg/phlaredb/head_queriers.go | 11 ++- pkg/phlaredb/query/iters.go | 145 ---------------------------------- 3 files changed, 16 insertions(+), 162 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index ed0e8346c..3b11b6a93 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -988,26 +988,26 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params } var ( - buf [][]parquet.Value - joinIters []query.Iterator + buf [][]parquet.Value + ) + + pIt := query.NewBinaryJoinIterator( + 0, + b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"), + b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), ) if b.meta.Version >= 2 { - joinIters = []query.Iterator{ - b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"), - b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), + pIt = query.NewBinaryJoinIterator( + 0, + pIt, b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"), - } + ) buf = make([][]parquet.Value, 3) } else { - joinIters = []query.Iterator{ - b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"), - b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), - } buf = make([][]parquet.Value, 2) } - pIt := query.NewJoinIterator(0, joinIters, nil) iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef)) defer pIt.Close() diff --git a/pkg/phlaredb/head_queriers.go b/pkg/phlaredb/head_queriers.go index bf15d922e..73dc3bd38 100644 --- a/pkg/phlaredb/head_queriers.go +++ b/pkg/phlaredb/head_queriers.go @@ -48,14 +48,13 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params * start = model.Time(params.Start) end = model.Time(params.End) ) - pIt := query.NewJoinIterator( - 0, - []query.Iterator{ + pIt := query.NewBinaryJoinIterator(0, + query.NewBinaryJoinIterator( + 0, rowIter, q.rowGroup().columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(start.UnixNano(), end.UnixNano()), "TimeNanos"), - q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"), - }, - nil, + ), + q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"), ) defer pIt.Close() diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 64c3847ee..83b23dcd1 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -487,151 +487,6 @@ type JoinIterator struct { result *IteratorResult } -var _ Iterator = (*JoinIterator)(nil) - -func NewJoinIterator(definitionLevel int, iters []Iterator, pred GroupPredicate) *JoinIterator { - j := JoinIterator{ - definitionLevel: definitionLevel, - iters: iters, - peeks: make([]*IteratorResult, len(iters)), - pred: pred, - } - return &j -} - -func (j *JoinIterator) At() *IteratorResult { - return j.result -} - -func (j *JoinIterator) Next() bool { - // Here is the algorithm for joins: On each pass of the iterators - // we remember which ones are pointing at the earliest rows. If all - // are the lowest (and therefore pointing at the same thing) then - // there is a successful join and return the result. - // Else we progress the iterators and try again. - // There is an optimization here in that we can seek to the highest - // row seen. It's impossible to have joins before that row. - for { - lowestRowNumber := MaxRowNumber() - highestRowNumber := EmptyRowNumber() - lowestIters := make([]int, 0, len(j.iters)) - - for iterNum := range j.iters { - res := j.peek(iterNum) - - if res == nil { - // Iterator exhausted, no more joins possible - j.result = nil - return false - } - - c := CompareRowNumbers(j.definitionLevel, res.RowNumber, lowestRowNumber) - switch c { - case -1: - // New lowest, reset - lowestIters = lowestIters[:0] - lowestRowNumber = res.RowNumber - fallthrough - - case 0: - // Same, append - lowestIters = append(lowestIters, iterNum) - } - - if CompareRowNumbers(j.definitionLevel, res.RowNumber, highestRowNumber) == 1 { - // New high water mark - highestRowNumber = res.RowNumber - } - } - - // All iterators pointing at same row? - if len(lowestIters) == len(j.iters) { - // Get the data - result := j.collect(lowestRowNumber) - - // Keep group? - if j.pred == nil || j.pred.KeepGroup(result) { - // Yes - j.result = result - return true - } - } - - // Skip all iterators to the highest row seen, it's impossible - // to find matches before that. - j.seekAll(RowNumberWithDefinitionLevel{RowNumber: highestRowNumber, DefinitionLevel: j.definitionLevel}) - } -} - -func (j *JoinIterator) Seek(to RowNumberWithDefinitionLevel) bool { - j.seekAll(to) - return j.Next() -} - -func (j *JoinIterator) seekAll(to RowNumberWithDefinitionLevel) { - to.RowNumber = TruncateRowNumber(to) - for iterNum, iter := range j.iters { - if j.peeks[iterNum] == nil || CompareRowNumbers(to.DefinitionLevel, j.peeks[iterNum].RowNumber, to.RowNumber) == -1 { - iteratorResultPoolPut(j.peeks[iterNum]) - if iter.Seek(to) { - j.peeks[iterNum] = iter.At() - } else { - j.peeks[iterNum] = nil - } - } - } -} - -func (j *JoinIterator) peek(iterNum int) *IteratorResult { - if j.peeks[iterNum] == nil { - if j.iters[iterNum].Next() { - j.peeks[iterNum] = j.iters[iterNum].At() - } - } - return j.peeks[iterNum] -} - -// Collect data from the given iterators until they point at -// the next row (according to the configured definition level) -// or are exhausted. -func (j *JoinIterator) collect(rowNumber RowNumber) *IteratorResult { - result := iteratorResultPoolGet() - result.RowNumber = rowNumber - - for i := range j.iters { - for j.peeks[i] != nil && CompareRowNumbers(j.definitionLevel, j.peeks[i].RowNumber, rowNumber) == 0 { - - result.Append(j.peeks[i]) - - iteratorResultPoolPut(j.peeks[i]) - - if j.iters[i].Next() { - j.peeks[i] = j.iters[i].At() - } else { - j.peeks[i] = nil - } - } - } - return result -} - -func (j *JoinIterator) Close() error { - var merr multierror.MultiError - for _, i := range j.iters { - merr.Add(i.Close()) - } - return merr.Err() -} - -func (j *JoinIterator) Err() error { - for _, i := range j.iters { - if err := i.Err(); err != nil { - return err - } - } - return nil -} - // UnionIterator produces all results for all given iterators. When iterators // align to the same row, based on the configured definition level, then the results // are returned together. Else the next matching iterator is returned. From cd841cc005c99d37115c67713ec98a46780799c2 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 6 Jul 2023 13:27:55 +0100 Subject: [PATCH 05/17] Correctly log the ReadPages on span --- pkg/phlaredb/query/iters.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 83b23dcd1..943b7b144 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -998,7 +998,7 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo } c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1) c.span.LogFields( - log.String("msg", "reading page"), + log.String("msg", "reading page (seekPages)"), log.Int64("page_num_values", pg.NumValues()), log.Int64("page_size", pg.Size()), ) @@ -1057,6 +1057,13 @@ func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) { if err != nil { return EmptyRowNumber(), nil, err } + c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1) + c.span.LogFields( + log.String("msg", "reading page (next)"), + log.Int64("page_num_values", pg.NumValues()), + log.Int64("page_size", pg.Size()), + ) + if c.filter != nil && !c.filter.KeepPage(pg) { // This page filtered out c.curr.Skip(pg.NumRows()) From eed3d16a449f144df60001ebbb84a73dadca96c5 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 6 Jul 2023 14:29:48 +0100 Subject: [PATCH 06/17] Use load on the atomic page stats --- pkg/phlaredb/query/iters.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 943b7b144..284e4a6bc 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -751,7 +751,7 @@ func (r *RowNumberIterator[T]) Seek(to RowNumberWithDefinitionLevel) bool { return true } -// SyncIterator is like ColumnIterator but synchronous. It scans through the given row +// SyncIterator is a synchronous column iterator. It scans through the given row // groups and column, and applies the optional predicate to each chunk, page, and value. // Results are read by calling Next() until it returns nil. type SyncIterator struct { @@ -1180,12 +1180,12 @@ func (c *SyncIterator) Err() error { func (c *SyncIterator) Close() error { c.closeCurrRowGroup() - c.span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks) - c.span.SetTag("inspectedPages", c.filter.InspectedPages) - c.span.SetTag("inspectedValues", c.filter.InspectedValues) - c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks) - c.span.SetTag("keptPages", c.filter.KeptPages) - c.span.SetTag("keptValues", c.filter.KeptValues) + c.span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load()) + c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load()) + c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load()) + c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load()) + c.span.SetTag("keptPages", c.filter.KeptPages.Load()) + c.span.SetTag("keptValues", c.filter.KeptValues.Load()) c.span.Finish() return nil } From 80ef21446186581b4ef22a567a1004016e438a04 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 6 Jul 2023 17:43:05 +0100 Subject: [PATCH 07/17] Get tests from tempo --- pkg/phlaredb/query/iters.go | 12 - pkg/phlaredb/query/iters_test.go | 387 ++++++++++++++++++++++--------- 2 files changed, 271 insertions(+), 128 deletions(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 284e4a6bc..bd912be6b 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -475,18 +475,6 @@ func (c *BinaryJoinIterator) Err() error { return c.err } -// JoinIterator joins two or more iterators for matches at the given definition level. -// I.e. joining at definitionLevel=0 means that each iterator must produce a result -// within the same root node. -type JoinIterator struct { - definitionLevel int - iters []Iterator - peeks []*IteratorResult - pred GroupPredicate - - result *IteratorResult -} - // UnionIterator produces all results for all given iterators. When iterators // align to the same row, based on the configured definition level, then the results // are returned together. Else the next matching iterator is returned. diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go index 2211fe5fe..b54f573d8 100644 --- a/pkg/phlaredb/query/iters_test.go +++ b/pkg/phlaredb/query/iters_test.go @@ -2,136 +2,45 @@ package query import ( "context" - "errors" + "math" + "os" "testing" "github.com/segmentio/parquet-go" "github.com/stretchr/testify/require" ) -type testData struct { - ID int64 `parquet:"id"` - Name string `parquet:"name"` -} +const MaxDefinitionLevel = 5 -func newTestBuffer[A any](rows []A) parquet.RowGroup { - buffer := parquet.NewBuffer() - for i := range rows { - err := buffer.Write(rows[i]) - if err != nil { - panic(err.Error()) - } - } - return buffer -} +type makeTestIterFn func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator -type errRowGroup struct { - parquet.RowGroup +var iterTestCases = []struct { + name string + makeIter makeTestIterFn +}{ + {"sync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator { + return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs) + }}, } -func (e *errRowGroup) ColumnChunks() []parquet.ColumnChunk { - chunks := e.RowGroup.ColumnChunks() - for pos := range chunks { - chunks[pos] = &errColumnChunk{chunks[pos]} - } - return chunks -} +/* +// TestNext compares the unrolled Next() with the original nextSlow() to +// prevent drift +func TestNext(t *testing.T) { + rn1 := RowNumber{0, 0, 0, 0, 0, 0} + rn2 := RowNumber{0, 0, 0, 0, 0, 0} -type errColumnChunk struct { - parquet.ColumnChunk -} + for i := 0; i < 1000; i++ { + r := rand.Intn(6) + d := rand.Intn(6) -func (e *errColumnChunk) Pages() parquet.Pages { - return &errPages{e.ColumnChunk.Pages()} -} + rn1.Next(r, d) + rn2.nextSlow(r, d) -type errPages struct { - parquet.Pages -} - -func (e *errPages) ReadPage() (parquet.Page, error) { - p, err := e.Pages.ReadPage() - return &errPage{p}, err -} - -type errPage struct { - parquet.Page -} - -func (e *errPage) Values() parquet.ValueReader { - return &errValueReader{e.Page.Values()} -} - -type errValueReader struct { - parquet.ValueReader -} - -func (e *errValueReader) ReadValues(vals []parquet.Value) (int, error) { - _, _ = e.ValueReader.ReadValues(vals) - return 0, errors.New("read error") -} - -func withReadValueError(rg []parquet.RowGroup) []parquet.RowGroup { - for pos := range rg { - rg[pos] = &errRowGroup{rg[pos]} - } - return rg -} - -func newTestSet() []parquet.RowGroup { - return []parquet.RowGroup{ - newTestBuffer( - []testData{ - {1, "one"}, - {2, "two"}, - }), - newTestBuffer( - []testData{ - {3, "three"}, - {5, "five"}, - }), - } -} - -func TestSyncIterator(t *testing.T) { - for _, tc := range []struct { - name string - result []parquet.Value - rowGroups []parquet.RowGroup - err error - }{ - { - name: "read-int-column", - rowGroups: newTestSet(), - result: []parquet.Value{ - parquet.ValueOf(1), - parquet.ValueOf(2), - parquet.ValueOf(3), - parquet.ValueOf(5), - }, - }, - { - name: "err-read-values", - rowGroups: withReadValueError(newTestSet()), - err: errors.New("read error"), - }, - } { - t.Run(tc.name, func(t *testing.T) { - var ( - buffer [][]parquet.Value - - ctx = context.Background() - i = NewSyncIterator(ctx, tc.rowGroups, 0, "id", 10, nil, "id") - ) - for i.Next() { - require.Nil(t, i.Err()) - buffer = i.At().Columns(buffer, "id") - } - - require.Equal(t, tc.err, i.Err()) - }) + require.Equal(t, rn1, rn2) } } +*/ func TestRowNumber(t *testing.T) { tr := EmptyRowNumber() @@ -170,6 +79,252 @@ func TestCompareRowNumbers(t *testing.T) { } for _, tc := range testCases { - require.Equal(t, tc.expected, CompareRowNumbers(5, tc.a, tc.b)) + require.Equal(t, tc.expected, CompareRowNumbers(MaxDefinitionLevel, tc.a, tc.b)) + } +} + +func TestRowNumberPreceding(t *testing.T) { + testCases := []struct { + start, preceding RowNumber + }{ + {RowNumber{1000, -1, -1, -1, -1, -1}, RowNumber{999, -1, -1, -1, -1, -1}}, + {RowNumber{1000, 0, 0, 0, 0, 0}, RowNumber{999, math.MaxInt64, math.MaxInt64, math.MaxInt64, math.MaxInt64, math.MaxInt64}}, + } + + for _, tc := range testCases { + require.Equal(t, tc.preceding, tc.start.Preceding()) + } +} + +func TestColumnIterator(t *testing.T) { + for _, tc := range iterTestCases { + t.Run(tc.name, func(t *testing.T) { + testColumnIterator(t, tc.makeIter) + }) + } +} + +func testColumnIterator(t *testing.T, makeIter makeTestIterFn) { + count := 100_000 + pf := createTestFile(t, count) + + idx, _ := GetColumnIndexByPath(pf, "A") + iter := makeIter(pf, idx, nil, "A") + defer iter.Close() + + for i := 0; i < count; i++ { + require.True(t, iter.Next()) + res := iter.At() + require.NotNil(t, res, "i=%d", i) + require.Equal(t, RowNumber{int64(i), -1, -1, -1, -1, -1}, res.RowNumber) + require.Equal(t, int64(i), res.ToMap()["A"][0].Int64()) + } + + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) +} + +func TestColumnIteratorSeek(t *testing.T) { + for _, tc := range iterTestCases { + t.Run(tc.name, func(t *testing.T) { + testColumnIteratorSeek(t, tc.makeIter) + }) + } +} + +func testColumnIteratorSeek(t *testing.T, makeIter makeTestIterFn) { + count := 10_000 + pf := createTestFile(t, count) + + idx, _ := GetColumnIndexByPath(pf, "A") + iter := makeIter(pf, idx, nil, "A") + defer iter.Close() + + seekTos := []int64{ + 100, + 1234, + 4567, + 5000, + 7890, } + + for _, seekTo := range seekTos { + rn := EmptyRowNumber() + rn[0] = seekTo + require.True(t, iter.Seek(RowNumberWithDefinitionLevel{rn, 0})) + res := iter.At() + require.NotNil(t, res, "seekTo=%v", seekTo) + require.Equal(t, RowNumber{seekTo, -1, -1, -1, -1, -1}, res.RowNumber) + require.Equal(t, seekTo, res.ToMap()["A"][0].Int64()) + } +} + +func TestColumnIteratorPredicate(t *testing.T) { + for _, tc := range iterTestCases { + t.Run(tc.name, func(t *testing.T) { + testColumnIteratorPredicate(t, tc.makeIter) + }) + } +} + +func testColumnIteratorPredicate(t *testing.T, makeIter makeTestIterFn) { + count := 10_000 + pf := createTestFile(t, count) + + pred := NewIntBetweenPredicate(7001, 7003) + + idx, _ := GetColumnIndexByPath(pf, "A") + iter := makeIter(pf, idx, pred, "A") + defer iter.Close() + + expectedResults := []int64{ + 7001, + 7002, + 7003, + } + + for _, expectedResult := range expectedResults { + require.True(t, iter.Next()) + res := iter.At() + require.NotNil(t, res) + require.Equal(t, RowNumber{expectedResult, -1, -1, -1, -1, -1}, res.RowNumber) + require.Equal(t, expectedResult, res.ToMap()["A"][0].Int64()) + } +} + +func TestColumnIteratorExitEarly(t *testing.T) { + type T struct{ A int } + + rows := []T{} + count := 10_000 + for i := 0; i < count; i++ { + rows = append(rows, T{i}) + } + + pf := createFileWith(t, rows) + idx, _ := GetColumnIndexByPath(pf, "A") + readSize := 1000 + + readIter := func(iter Iterator) (int, error) { + received := 0 + for iter.Next() { + received++ + } + return received, iter.Err() + } + + t.Run("cancelledEarly", func(t *testing.T) { + // Cancel before iterating + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + iter := NewSyncIterator(ctx, pf.RowGroups(), idx, "", readSize, nil, "A") + count, err := readIter(iter) + require.ErrorContains(t, err, "context canceled") + require.Equal(t, 0, count) + }) + + t.Run("cancelledPartial", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + iter := NewSyncIterator(ctx, pf.RowGroups(), idx, "", readSize, nil, "A") + + // Read some results + require.True(t, iter.Next()) + + // Then cancel + cancel() + + // Read again = context cancelled + _, err := readIter(iter) + require.ErrorContains(t, err, "context canceled") + }) + + t.Run("closedEarly", func(t *testing.T) { + // Close before iterating + iter := NewSyncIterator(context.TODO(), pf.RowGroups(), idx, "", readSize, nil, "A") + iter.Close() + count, err := readIter(iter) + require.NoError(t, err) + require.Equal(t, 0, count) + }) + + t.Run("closedPartial", func(t *testing.T) { + iter := NewSyncIterator(context.TODO(), pf.RowGroups(), idx, "", readSize, nil, "A") + + // Read some results + require.True(t, iter.Next()) + + // Then close + iter.Close() + + // Read again = should close early + res2, err := readIter(iter) + require.NoError(t, err) + require.Less(t, readSize+res2, count) + }) +} + +func BenchmarkColumnIterator(b *testing.B) { + for _, tc := range iterTestCases { + b.Run(tc.name, func(b *testing.B) { + benchmarkColumnIterator(b, tc.makeIter) + }) + } +} + +func benchmarkColumnIterator(b *testing.B, makeIter makeTestIterFn) { + count := 100_000 + pf := createTestFile(b, count) + + idx, _ := GetColumnIndexByPath(pf, "A") + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + iter := makeIter(pf, idx, nil, "A") + actualCount := 0 + for iter.Next() { + actualCount++ + } + iter.Close() + require.Equal(b, count, actualCount) + //fmt.Println(actualCount) + } +} + +func createTestFile(t testing.TB, count int) *parquet.File { + type T struct{ A int } + + rows := []T{} + for i := 0; i < count; i++ { + rows = append(rows, T{i}) + } + + pf := createFileWith(t, rows) + return pf +} + +func createFileWith[T any](t testing.TB, rows []T) *parquet.File { + f, err := os.CreateTemp(t.TempDir(), "data.parquet") + require.NoError(t, err) + + half := len(rows) / 2 + + w := parquet.NewGenericWriter[T](f) + _, err = w.Write(rows[0:half]) + require.NoError(t, err) + require.NoError(t, w.Flush()) + + _, err = w.Write(rows[half:]) + require.NoError(t, err) + require.NoError(t, w.Flush()) + + require.NoError(t, w.Close()) + + stat, err := f.Stat() + require.NoError(t, err) + + pf, err := parquet.OpenFile(f, stat.Size()) + require.NoError(t, err) + + return pf } From b59403d1fafad8384963575e401bee96f2729444 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 7 Jul 2023 17:52:01 +0100 Subject: [PATCH 08/17] Move map predicate and base it off the min/max This predicate still wouldn't work all that well with a fully SeriesID sorted block (so accress row groups). But let's fix that separately --- pkg/phlaredb/block_querier.go | 55 +------------------------------- pkg/phlaredb/query/predicates.go | 40 +++++++++++++++++++++++ 2 files changed, 41 insertions(+), 54 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 3b11b6a93..85b9de469 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -28,7 +28,6 @@ import ( "github.com/samber/lo" "github.com/segmentio/parquet-go" "github.com/thanos-io/objstore" - "golang.org/x/exp/constraints" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" @@ -462,58 +461,6 @@ func (b *singleBlockQuerier) Bounds() (model.Time, model.Time) { return b.meta.MinTime, b.meta.MaxTime } -type mapPredicate[K constraints.Integer, V any] struct { - min K - max K - m map[K]V -} - -func newMapPredicate[K constraints.Integer, V any](m map[K]V) query.Predicate { - p := &mapPredicate[K, V]{ - m: m, - } - - first := true - for k := range m { - if first || p.max < k { - p.max = k - } - if first || p.min > k { - p.min = k - } - first = false - } - - return p -} - -func (m *mapPredicate[K, V]) KeepColumnChunk(c parquet.ColumnChunk) bool { - if ci := c.ColumnIndex(); ci != nil { - for i := 0; i < ci.NumPages(); i++ { - min := K(ci.MinValue(i).Int64()) - max := K(ci.MaxValue(i).Int64()) - if m.max >= min && m.min <= max { - return true - } - } - return false - } - - return true -} - -func (m *mapPredicate[K, V]) KeepPage(page parquet.Page) bool { - if min, max, ok := page.Bounds(); ok { - return m.max >= K(min.Int64()) && m.min <= K(max.Int64()) - } - return true -} - -func (m *mapPredicate[K, V]) KeepValue(v parquet.Value) bool { - _, exists := m.m[K(v.Int64())] - return exists -} - type labelsInfo struct { fp model.Fingerprint lbs phlaremodel.Labels @@ -993,7 +940,7 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params pIt := query.NewBinaryJoinIterator( 0, - b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"), + b.profiles.columnIter(ctx, "SeriesIndex", query.NewMapPredicate(lblsPerRef), "SeriesIndex"), b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), ) diff --git a/pkg/phlaredb/query/predicates.go b/pkg/phlaredb/query/predicates.go index bccdd95cc..5df379154 100644 --- a/pkg/phlaredb/query/predicates.go +++ b/pkg/phlaredb/query/predicates.go @@ -6,6 +6,7 @@ import ( pq "github.com/segmentio/parquet-go" "go.uber.org/atomic" + "golang.org/x/exp/constraints" ) // Predicate is a pushdown predicate that can be applied at @@ -254,3 +255,42 @@ func (p *InstrumentedPredicate) KeepValue(v pq.Value) bool { return false } + +type mapPredicate[K constraints.Integer, V any] struct { + inbetweenPred Predicate + m map[K]V +} + +func NewMapPredicate[K constraints.Integer, V any](m map[K]V) Predicate { + + var min, max int64 + + first := true + for k := range m { + if first || max < int64(k) { + max = int64(k) + } + if first || min > int64(k) { + min = int64(k) + } + first = false + } + + return &mapPredicate[K, V]{ + inbetweenPred: NewIntBetweenPredicate(min, max), + m: m, + } +} + +func (m *mapPredicate[K, V]) KeepColumnChunk(c pq.ColumnChunk) bool { + return m.inbetweenPred.KeepColumnChunk(c) +} + +func (m *mapPredicate[K, V]) KeepPage(page pq.Page) bool { + return m.inbetweenPred.KeepPage(page) +} + +func (m *mapPredicate[K, V]) KeepValue(v pq.Value) bool { + _, exists := m.m[K(v.Int64())] + return exists +} From fb3444042cd702ae14c7006850e9e470f39228a5 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 7 Jul 2023 17:54:06 +0100 Subject: [PATCH 09/17] Fix bug when we iterate left iterator based on a higher right result --- pkg/phlaredb/query/iters.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index bd912be6b..e9979d40a 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -432,20 +432,30 @@ func (bj *BinaryJoinIterator) Next() bool { } resRight := bj.right.At() - if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 { - // we have a found an element + makeResult := func() { bj.res = iteratorResultPoolGet() bj.res.RowNumber = resLeft.RowNumber bj.res.Append(resLeft) bj.res.Append(resRight) - // columnIteratorResultPoolPut(resLeft) - // columnIteratorResultPoolPut(resRight) + iteratorResultPoolPut(resLeft) + iteratorResultPoolPut(resRight) + } + + if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 { + // we have a found an element + makeResult() return true } else if cmp < 0 { if !bj.left.Seek(RowNumberWithDefinitionLevel{resRight.RowNumber, bj.definitionLevel}) { bj.err = bj.left.Err() return false } + resLeft = bj.left.At() + + if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 { + makeResult() + return true + } } else { // the right value can't be smaller than the left one because we seeked beyond it From 6e6a32610e7067289e971bf0914894f38256b4ab Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 7 Jul 2023 17:57:53 +0100 Subject: [PATCH 10/17] Test BinaryJoinIterator thoroughly --- pkg/phlaredb/query/iters_test.go | 106 +++++++++++++++++++++++++++++-- 1 file changed, 102 insertions(+), 4 deletions(-) diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go index b54f573d8..8bb36ba10 100644 --- a/pkg/phlaredb/query/iters_test.go +++ b/pkg/phlaredb/query/iters_test.go @@ -201,7 +201,7 @@ func TestColumnIteratorExitEarly(t *testing.T) { rows = append(rows, T{i}) } - pf := createFileWith(t, rows) + pf := createFileWith(t, rows, 2) idx, _ := GetColumnIndexByPath(pf, "A") readSize := 1000 @@ -299,15 +299,47 @@ func createTestFile(t testing.TB, count int) *parquet.File { rows = append(rows, T{i}) } - pf := createFileWith(t, rows) + pf := createFileWith(t, rows, 2) return pf } -func createFileWith[T any](t testing.TB, rows []T) *parquet.File { +func createProfileLikeFile(t testing.TB, count int) *parquet.File { + type T struct { + SeriesID uint32 + TimeNanos int64 + } + + // every row group is ordered by serieID and then time nanos + // time is always increasing between rowgroups + + rowGroups := 10 + series := 8 + + rows := make([]T, count) + for i := range rows { + + rowsPerRowGroup := count / rowGroups + seriesPerRowGroup := rowsPerRowGroup / series + rowGroupNum := i / rowsPerRowGroup + + seriesID := uint32(i % (count / rowGroups) / (rowsPerRowGroup / series)) + rows[i] = T{ + SeriesID: seriesID, + TimeNanos: int64(i%seriesPerRowGroup+rowGroupNum*seriesPerRowGroup) * 1000, + } + + } + + return createFileWith[T](t, rows, rowGroups) + +} + +func createFileWith[T any](t testing.TB, rows []T, rowGroups int) *parquet.File { f, err := os.CreateTemp(t.TempDir(), "data.parquet") require.NoError(t, err) + t.Logf("Created temp file %s", f.Name()) - half := len(rows) / 2 + half := len(rows) / rowGroups w := parquet.NewGenericWriter[T](f) _, err = w.Write(rows[0:half]) @@ -328,3 +360,69 @@ func createFileWith[T any](t testing.TB, rows []T) *parquet.File { return pf } + +func TestBinaryJoinIterator(t *testing.T) { + rowCount := 1600 + pf := createProfileLikeFile(t, rowCount) + + for _, tc := range []struct { + name string + seriesPredicate Predicate + timePredicate Predicate + expectedResultCount int + }{ + { + name: "no predicate", + expectedResultCount: rowCount, // expect everything + }, + { + name: "one series ID", + expectedResultCount: rowCount / 8, // expect an eigth of the rows + seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}}), + }, + { + name: "two series IDs", + expectedResultCount: rowCount / 8 * 2, // expect two eigth of the rows + seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}), + }, + { + name: "first two time stamps each", + expectedResultCount: 2 * 8, // expect an eigth of the rows + timePredicate: NewIntBetweenPredicate(0, 1000), + }, + { + name: "time before results", + expectedResultCount: 0, // expect an eigth of the rows + timePredicate: NewIntBetweenPredicate(-10, -1), + }, + { + name: "time after results", + expectedResultCount: 0, // expect an eigth of the rows + timePredicate: NewIntBetweenPredicate(200000, 20001000), + seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}), + }, + } { + t.Run(tc.name, func(t *testing.T) { + seriesIt := NewSyncIterator(ctx, pf.RowGroups(), 0, "SeriesId", 1000, tc.seriesPredicate, "SeriesId") + timeIt := NewSyncIterator(ctx, pf.RowGroups(), 1, "TimeNanos", 1000, tc.timePredicate, "TimeNanos") + + it := NewBinaryJoinIterator( + 0, + seriesIt, + timeIt, + ) + defer func() { + require.NoError(t, it.Close()) + }() + + results := 0 + for it.Next() { + results++ + } + require.NoError(t, it.Err()) + + require.Equal(t, tc.expectedResultCount, results) + + }) + } +} From 82e4f48a6530855cb918e8a1bd048b9fafd4fc71 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 7 Jul 2023 17:59:48 +0100 Subject: [PATCH 11/17] Optimize when the Seek destination is only a single row away --- pkg/phlaredb/query/iters.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index e9979d40a..4f4b8732b 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -417,6 +417,15 @@ func NewBinaryJoinIterator(definitionLevel int, left, right Iterator) *BinaryJoi } } +// nextOrSeek will use next if the iterator is exactly one row aways +func (bj *BinaryJoinIterator) nextOrSeek(to RowNumberWithDefinitionLevel, it Iterator) bool { + // Seek when definition level is higher then 0, there is not previous iteration or when the difference between current position and to is not 1 + if to.DefinitionLevel != 0 || it.At() == nil || to.RowNumber.Preceding() != it.At().RowNumber { + return it.Seek(to) + } + return it.Next() +} + func (bj *BinaryJoinIterator) Next() bool { for { if !bj.left.Next() { @@ -426,7 +435,7 @@ func (bj *BinaryJoinIterator) Next() bool { resLeft := bj.left.At() // now seek the right iterator to the left position - if !bj.right.Seek(RowNumberWithDefinitionLevel{resLeft.RowNumber, bj.definitionLevel}) { + if !bj.nextOrSeek(RowNumberWithDefinitionLevel{resLeft.RowNumber, bj.definitionLevel}, bj.right) { bj.err = bj.right.Err() return false } @@ -446,7 +455,7 @@ func (bj *BinaryJoinIterator) Next() bool { makeResult() return true } else if cmp < 0 { - if !bj.left.Seek(RowNumberWithDefinitionLevel{resRight.RowNumber, bj.definitionLevel}) { + if !bj.nextOrSeek(RowNumberWithDefinitionLevel{resRight.RowNumber, bj.definitionLevel}, bj.left) { bj.err = bj.left.Err() return false } From f185e91533af0441a5e906dbd848cb17a9e80f47 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 7 Jul 2023 18:29:21 +0100 Subject: [PATCH 12/17] Fix tests comments --- pkg/phlaredb/query/iters_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go index 8bb36ba10..68a7406f5 100644 --- a/pkg/phlaredb/query/iters_test.go +++ b/pkg/phlaredb/query/iters_test.go @@ -385,19 +385,24 @@ func TestBinaryJoinIterator(t *testing.T) { expectedResultCount: rowCount / 8 * 2, // expect two eigth of the rows seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}), }, + { + name: "missing series", + expectedResultCount: 0, + seriesPredicate: NewMapPredicate(map[int64]struct{}{10: {}}), + }, { name: "first two time stamps each", - expectedResultCount: 2 * 8, // expect an eigth of the rows + expectedResultCount: 2 * 8, // expect two profiles for each series timePredicate: NewIntBetweenPredicate(0, 1000), }, { name: "time before results", - expectedResultCount: 0, // expect an eigth of the rows + expectedResultCount: 0, timePredicate: NewIntBetweenPredicate(-10, -1), }, { name: "time after results", - expectedResultCount: 0, // expect an eigth of the rows + expectedResultCount: 0, timePredicate: NewIntBetweenPredicate(200000, 20001000), seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}), }, From 706aa29d3e164988448b0a069af70bd7f39d3367 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 7 Jul 2023 18:58:53 +0100 Subject: [PATCH 13/17] Validate that page reads are as expected --- pkg/phlaredb/query/iters_test.go | 57 ++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go index 68a7406f5..411ad8f0d 100644 --- a/pkg/phlaredb/query/iters_test.go +++ b/pkg/phlaredb/query/iters_test.go @@ -1,11 +1,15 @@ package query import ( + "bytes" "context" + "fmt" "math" "os" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/segmentio/parquet-go" "github.com/stretchr/testify/require" ) @@ -339,14 +343,17 @@ func createFileWith[T any](t testing.TB, rows []T, rowGroups int) *parquet.File require.NoError(t, err) t.Logf("Created temp file %s", f.Name()) - half := len(rows) / rowGroups + perRG := len(rows) / rowGroups w := parquet.NewGenericWriter[T](f) - _, err = w.Write(rows[0:half]) - require.NoError(t, err) - require.NoError(t, w.Flush()) + for i := 0; i < (rowGroups - 1); i++ { + _, err = w.Write(rows[0:perRG]) + require.NoError(t, err) + require.NoError(t, w.Flush()) + rows = rows[perRG:] + } - _, err = w.Write(rows[half:]) + _, err = w.Write(rows) require.NoError(t, err) require.NoError(t, w.Flush()) @@ -368,22 +375,30 @@ func TestBinaryJoinIterator(t *testing.T) { for _, tc := range []struct { name string seriesPredicate Predicate + seriesPageReads int timePredicate Predicate + timePageReads int expectedResultCount int }{ { name: "no predicate", expectedResultCount: rowCount, // expect everything + seriesPageReads: 10, + timePageReads: 10, }, { name: "one series ID", - expectedResultCount: rowCount / 8, // expect an eigth of the rows + expectedResultCount: rowCount / 8, // expect an eight of the rows seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}}), + seriesPageReads: 10, + timePageReads: 10, }, { name: "two series IDs", - expectedResultCount: rowCount / 8 * 2, // expect two eigth of the rows + expectedResultCount: rowCount / 8 * 2, // expect two eights of the rows seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}), + seriesPageReads: 10, + timePageReads: 10, }, { name: "missing series", @@ -394,20 +409,35 @@ func TestBinaryJoinIterator(t *testing.T) { name: "first two time stamps each", expectedResultCount: 2 * 8, // expect two profiles for each series timePredicate: NewIntBetweenPredicate(0, 1000), + seriesPageReads: 1, + timePageReads: 1, }, { name: "time before results", expectedResultCount: 0, timePredicate: NewIntBetweenPredicate(-10, -1), + seriesPageReads: 1, + timePageReads: 0, }, { name: "time after results", expectedResultCount: 0, timePredicate: NewIntBetweenPredicate(200000, 20001000), seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}), + seriesPageReads: 1, + timePageReads: 0, }, } { t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + reg := prometheus.NewRegistry() + metrics := NewMetrics(reg) + metrics.pageReadsTotal.WithLabelValues("ts", "SeriesId").Add(0) + metrics.pageReadsTotal.WithLabelValues("ts", "TimeNanos").Add(0) + ctx = AddMetricsToContext(ctx, metrics) + seriesIt := NewSyncIterator(ctx, pf.RowGroups(), 0, "SeriesId", 1000, tc.seriesPredicate, "SeriesId") timeIt := NewSyncIterator(ctx, pf.RowGroups(), 1, "TimeNanos", 1000, tc.timePredicate, "TimeNanos") @@ -416,9 +446,6 @@ func TestBinaryJoinIterator(t *testing.T) { seriesIt, timeIt, ) - defer func() { - require.NoError(t, it.Close()) - }() results := 0 for it.Next() { @@ -426,8 +453,18 @@ func TestBinaryJoinIterator(t *testing.T) { } require.NoError(t, it.Err()) + require.NoError(t, it.Close()) + require.Equal(t, tc.expectedResultCount, results) + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewReader([]byte(fmt.Sprintf( + ` + # HELP pyroscopedb_page_reads_total Total number of pages read while querying + # TYPE pyroscopedb_page_reads_total counter + pyroscopedb_page_reads_total{column="SeriesId",table="ts"} %d + pyroscopedb_page_reads_total{column="TimeNanos",table="ts"} %d + `, tc.seriesPageReads, tc.timePageReads))), "pyroscopedb_page_reads_total")) + }) } } From d34f635e53941d87225cb5058a08ccf97a557965 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Mon, 10 Jul 2023 09:29:04 +0100 Subject: [PATCH 14/17] Fix closing and cancellation A call to Next/Seek after a close should also yield a context cancelled. --- pkg/phlaredb/query/iters.go | 17 ++++++++++++++++- pkg/phlaredb/query/iters_test.go | 4 ++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 4f4b8732b..82513a0c3 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -774,6 +774,8 @@ type SyncIterator struct { filter *InstrumentedPredicate // Status + ctx context.Context + cancel func() span opentracing.Span metrics *Metrics curr RowNumber @@ -831,13 +833,17 @@ func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, co rn.Skip(rg.NumRows()) } - span, _ := opentracing.StartSpanFromContext(ctx, "syncIterator", opentracing.Tags{ + span, ctx := opentracing.StartSpanFromContext(ctx, "syncIterator", opentracing.Tags{ "columnIndex": column, "column": columnName, }) + ctx, cancel := context.WithCancel(ctx) + return &SyncIterator{ table: strings.ToLower(rgs[0].Schema().Name()) + "s", + ctx: ctx, + cancel: cancel, metrics: getMetricsFromContext(ctx), span: span, column: column, @@ -1040,6 +1046,14 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo // when being called multiple times and throwing away the results like in SeekTo(). func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) { for { + + // return if context is cancelled + select { + case <-c.ctx.Done(): + return EmptyRowNumber(), nil, c.ctx.Err() + default: + } + if c.currRowGroup == nil { rg, min, max := c.popRowGroup() if rg == nil { @@ -1159,6 +1173,7 @@ func (c *SyncIterator) setPage(pg parquet.Page) { } func (c *SyncIterator) closeCurrRowGroup() { + c.cancel() if c.currPages != nil { c.currPages.Close() } diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go index 411ad8f0d..220cee7e8 100644 --- a/pkg/phlaredb/query/iters_test.go +++ b/pkg/phlaredb/query/iters_test.go @@ -247,7 +247,7 @@ func TestColumnIteratorExitEarly(t *testing.T) { iter := NewSyncIterator(context.TODO(), pf.RowGroups(), idx, "", readSize, nil, "A") iter.Close() count, err := readIter(iter) - require.NoError(t, err) + require.ErrorContains(t, err, "context canceled") require.Equal(t, 0, count) }) @@ -262,7 +262,7 @@ func TestColumnIteratorExitEarly(t *testing.T) { // Read again = should close early res2, err := readIter(iter) - require.NoError(t, err) + require.ErrorContains(t, err, "context canceled") require.Less(t, readSize+res2, count) }) } From 3869536b129c6a610f7ba1ef53e77c4b7afa664f Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Mon, 10 Jul 2023 13:29:53 +0100 Subject: [PATCH 15/17] Do cancel it in the right place --- pkg/phlaredb/query/iters.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 82513a0c3..fc85ddbd0 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -1173,7 +1173,6 @@ func (c *SyncIterator) setPage(pg parquet.Page) { } func (c *SyncIterator) closeCurrRowGroup() { - c.cancel() if c.currPages != nil { c.currPages.Close() } @@ -1200,6 +1199,7 @@ func (c *SyncIterator) Err() error { } func (c *SyncIterator) Close() error { + c.cancel() c.closeCurrRowGroup() c.span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load()) From 3a9306c6b8bba307085be84a507759722221cdca Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 7 Jul 2023 18:00:25 +0100 Subject: [PATCH 16/17] Add otel traces to visualize tests better Switch to otel --- pkg/phlaredb/query/iters.go | 67 ++++++++++------- pkg/phlaredb/query/iters_test.go | 120 ++++++++++++++++++++++++++++++- 2 files changed, 161 insertions(+), 26 deletions(-) diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index fc85ddbd0..ac2013fba 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -10,9 +10,10 @@ import ( "sync" "github.com/grafana/dskit/multierror" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/log" "github.com/segmentio/parquet-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/grafana/phlare/pkg/iter" ) @@ -39,6 +40,10 @@ type RowNumberWithDefinitionLevel struct { DefinitionLevel int } +func (r *RowNumberWithDefinitionLevel) String() string { + return fmt.Sprintf("%v:%v", r.RowNumber, r.DefinitionLevel) +} + // EmptyRowNumber creates an empty invalid row number. func EmptyRowNumber() RowNumber { return RowNumber{-1, -1, -1, -1, -1, -1} @@ -369,6 +374,14 @@ func (r *IteratorResult) Columns(buffer [][]parquet.Value, names ...string) [][] return buffer } +func (r *IteratorResult) String() string { + if r == nil { + return "nil" + } + return fmt.Sprintf("rowNum=%d entries=%+#v", r.RowNumber[0], r.ToMap()) + +} + // iterator - Every iterator follows this interface and can be composed. type Iterator = iter.SeekIterator[*IteratorResult, RowNumberWithDefinitionLevel] @@ -776,7 +789,7 @@ type SyncIterator struct { // Status ctx context.Context cancel func() - span opentracing.Span + span trace.Span metrics *Metrics curr RowNumber currRowGroup parquet.RowGroup @@ -833,10 +846,12 @@ func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, co rn.Skip(rg.NumRows()) } - span, ctx := opentracing.StartSpanFromContext(ctx, "syncIterator", opentracing.Tags{ - "columnIndex": column, - "column": columnName, - }) + tr := otel.Tracer("query") + + ctx, span := tr.Start(ctx, "syncIterator", trace.WithAttributes( + attribute.String("column", columnName), + attribute.Int("columnIndex", column), + )) ctx, cancel := context.WithCancel(ctx) @@ -1010,11 +1025,12 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo return true, err } c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1) - c.span.LogFields( - log.String("msg", "reading page (seekPages)"), - log.Int64("page_num_values", pg.NumValues()), - log.Int64("page_size", pg.Size()), - ) + c.span.AddEvent( + "read page (seekPages)", + trace.WithAttributes( + attribute.Int64("page_num_values", pg.NumValues()), + attribute.Int64("page_size", pg.Size()), + )) // Skip based on row number? newRN := c.curr @@ -1079,11 +1095,12 @@ func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) { return EmptyRowNumber(), nil, err } c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1) - c.span.LogFields( - log.String("msg", "reading page (next)"), - log.Int64("page_num_values", pg.NumValues()), - log.Int64("page_size", pg.Size()), - ) + c.span.AddEvent( + "read page (next)", + trace.WithAttributes( + attribute.Int64("page_num_values", pg.NumValues()), + attribute.Int64("page_size", pg.Size()), + )) if c.filter != nil && !c.filter.KeepPage(pg) { // This page filtered out @@ -1202,12 +1219,14 @@ func (c *SyncIterator) Close() error { c.cancel() c.closeCurrRowGroup() - c.span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load()) - c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load()) - c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load()) - c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load()) - c.span.SetTag("keptPages", c.filter.KeptPages.Load()) - c.span.SetTag("keptValues", c.filter.KeptValues.Load()) - c.span.Finish() + c.span.SetAttributes(attribute.Int64("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load())) + /* + c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load()) + c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load()) + c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load()) + c.span.SetTag("keptPages", c.filter.KeptPages.Load()) + c.span.SetTag("keptValues", c.filter.KeptValues.Load()) + */ + c.span.End() return nil } diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go index 220cee7e8..1b06b344f 100644 --- a/pkg/phlaredb/query/iters_test.go +++ b/pkg/phlaredb/query/iters_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "log" "math" "os" "testing" @@ -12,6 +13,13 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/segmentio/parquet-go" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" ) const MaxDefinitionLevel = 5 @@ -368,7 +376,109 @@ func createFileWith[T any](t testing.TB, rows []T, rowGroups int) *parquet.File return pf } +type iteratorTracer struct { + it Iterator + span trace.Span + name string + nextCount uint32 + seekCount uint32 +} + +func (i iteratorTracer) Next() bool { + i.nextCount++ + posBefore := i.it.At() + result := i.it.Next() + posAfter := i.it.At() + i.span.AddEvent("next", trace.WithAttributes( + attribute.String("column", i.name), + attribute.Bool("result", result), + attribute.Stringer("posBefore", posBefore), + attribute.Stringer("posAfter", posAfter), + )) + return result +} + +func (i iteratorTracer) At() *IteratorResult { + return i.it.At() +} + +func (i iteratorTracer) Err() error { + return i.it.Err() +} + +func (i iteratorTracer) Close() error { + return i.it.Close() +} + +func (i iteratorTracer) Seek(pos RowNumberWithDefinitionLevel) bool { + i.seekCount++ + posBefore := i.it.At() + result := i.it.Seek(pos) + posAfter := i.it.At() + i.span.AddEvent("seek", trace.WithAttributes( + attribute.String("column", i.name), + attribute.Bool("result", result), + attribute.Stringer("seekTo", &pos), + attribute.Stringer("posBefore", posBefore), + attribute.Stringer("posAfter", posAfter), + )) + return result +} + +func newIteratorTracer(span trace.Span, name string, it Iterator) Iterator { + return &iteratorTracer{ + span: span, + name: name, + it: it, + } +} + +// tracerProvider returns an OpenTelemetry TracerProvider configured to use +// the Jaeger exporter that will send spans to the provided url. The returned +// TracerProvider will also use a Resource configured with all the information +// about the application. +func tracerProvider(url string) (*tracesdk.TracerProvider, error) { + // Create the Jaeger exporter + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) + if err != nil { + return nil, err + } + tp := tracesdk.NewTracerProvider( + // Always be sure to batch in production. + tracesdk.WithBatcher(exp), + // Record information about this application in a Resource. + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("phlare-go-test"), + )), + ) + return tp, nil +} + +func TestMain(m *testing.M) { + tp, err := tracerProvider("http://localhost:14268/api/traces") + if err != nil { + log.Fatal(err) + } + + // Register our TracerProvider as the global so any imported + // instrumentation in the future will default to using it. + otel.SetTracerProvider(tp) + + result := m.Run() + + fmt.Println("shutting tracer down") + tp.Shutdown(context.Background()) + + os.Exit(result) +} + func TestBinaryJoinIterator(t *testing.T) { + tr := otel.Tracer("query") + + _, span := tr.Start(context.Background(), "TestBinaryJoinIterator") + defer span.End() + rowCount := 1600 pf := createProfileLikeFile(t, rowCount) @@ -438,8 +548,11 @@ func TestBinaryJoinIterator(t *testing.T) { metrics.pageReadsTotal.WithLabelValues("ts", "TimeNanos").Add(0) ctx = AddMetricsToContext(ctx, metrics) - seriesIt := NewSyncIterator(ctx, pf.RowGroups(), 0, "SeriesId", 1000, tc.seriesPredicate, "SeriesId") - timeIt := NewSyncIterator(ctx, pf.RowGroups(), 1, "TimeNanos", 1000, tc.timePredicate, "TimeNanos") + seriesIt := newIteratorTracer(span, "SeriesID", NewSyncIterator(ctx, pf.RowGroups(), 0, "SeriesId", 1000, tc.seriesPredicate, "SeriesId")) + timeIt := newIteratorTracer(span, "TimeNanos", NewSyncIterator(ctx, pf.RowGroups(), 1, "TimeNanos", 1000, tc.timePredicate, "TimeNanos")) + + ctx, span := tr.Start(ctx, t.Name()) + defer span.End() it := NewBinaryJoinIterator( 0, @@ -449,6 +562,9 @@ func TestBinaryJoinIterator(t *testing.T) { results := 0 for it.Next() { + span.AddEvent("match", trace.WithAttributes( + attribute.Stringer("element", it.At()), + )) results++ } require.NoError(t, it.Err()) From 8363ed739f8a4a6546b8f35ca69540b43cf3061d Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Tue, 11 Jul 2023 09:17:18 +0100 Subject: [PATCH 17/17] WIP: Write up gopatch --- migrate-to-otel.gopatch | 40 +++++++++++++++++ pkg/phlaredb/block_querier.go | 66 ++++++++++++++-------------- pkg/phlaredb/head.go | 14 +++--- pkg/phlaredb/head_queriers.go | 33 +++++++------- pkg/phlaredb/phlaredb.go | 20 ++++----- pkg/phlaredb/profiles.go | 10 +++-- pkg/phlaredb/query/repeated.go | 13 +++--- pkg/phlaredb/sample_merge.go | 58 ++++++++++++------------ pkg/querier/ingester_querier.go | 11 +++-- pkg/querier/querier.go | 65 ++++++++++++++------------- pkg/querier/select_merge.go | 31 ++++++------- pkg/querier/store_gateway_querier.go | 11 +++-- pkg/scheduler/scheduler.go | 3 +- 13 files changed, 217 insertions(+), 158 deletions(-) create mode 100644 migrate-to-otel.gopatch diff --git a/migrate-to-otel.gopatch b/migrate-to-otel.gopatch new file mode 100644 index 000000000..81ae324c5 --- /dev/null +++ b/migrate-to-otel.gopatch @@ -0,0 +1,40 @@ +@@ +var a expression +var b expression +var s identifier +var t identifier +@@ +-s, t := opentracing.StartSpanFromContext(a,b) +-... +- defer s.Finish() ++import "go.opentelemetry.io/otel" ++t, s := otel.Tracer("github.com/grafana/pyroscope").Start(a,b) ++defer s.End() + +@@ +var foo,x identifier +@@ + +-import foo "github.com/opentracing/opentracing-go/log" ++import foo "go.opentelemetry.io/otel/attribute" +foo.x + +@@ +@@ +- otlog ++ attribute + +@@ +var span identifier +var x expression +@@ +- span.LogFields(...) ++import "go.opentelemetry.io/otel/trace" ++ span.AddEvent("TODO", trace.WithAttributes(...)) + + +@@ +@@ +-opentracing.Span ++import "go.opentelemetry.io/otel/trace" ++trace.Span diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 85b9de469..fd690d60f 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -21,13 +21,15 @@ import ( "github.com/grafana/dskit/runutil" "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" "github.com/samber/lo" "github.com/segmentio/parquet-go" "github.com/thanos-io/objstore" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" @@ -557,8 +559,8 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile } func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesStacktraces") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesStacktraces") + defer sp.End() r, err := stream.Receive() if err != nil { @@ -572,12 +574,11 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request")) } request := r.Request - sp.LogFields( - otlog.String("start", model.Time(request.Start).Time().String()), - otlog.String("end", model.Time(request.End).Time().String()), - otlog.String("selector", request.LabelSelector), - otlog.String("profile_id", request.Type.ID), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(request.Start).Time().String()), + attribute.String("end", model.Time(request.End).Time().String()), + attribute.String("selector", request.LabelSelector), + attribute.String("profile_id", request.Type.ID))) queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End)) if err != nil { @@ -621,7 +622,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in // Signals the end of the profile streaming by sending an empty response. // This allows the client to not block other streaming ingesters. - sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "signaling the end of the profile streaming"))) if err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{}); err != nil { return err } @@ -631,7 +632,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in } // sends the final result to the client. - sp.LogFields(otlog.String("msg", "sending the final result to the client")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending the final result to the client"))) err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{ Result: &ingestv1.MergeProfilesStacktracesResult{ Format: ingestv1.StacktracesMergeFormat_MERGE_FORMAT_TREE, @@ -649,8 +650,8 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in } func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesLabels") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesLabels") + defer sp.End() r, err := stream.Receive() if err != nil { @@ -666,13 +667,12 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv request := r.Request by := r.By sort.Strings(by) - sp.LogFields( - otlog.String("start", model.Time(request.Start).Time().String()), - otlog.String("end", model.Time(request.End).Time().String()), - otlog.String("selector", request.LabelSelector), - otlog.String("profile_id", request.Type.ID), - otlog.String("by", strings.Join(by, ",")), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(request.Start).Time().String()), + attribute.String("end", model.Time(request.End).Time().String()), + attribute.String("selector", request.LabelSelector), + attribute.String("profile_id", request.Type.ID), + attribute.String("by", strings.Join(by, ",")))) queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End)) if err != nil { @@ -743,8 +743,8 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv } func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesPprof") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesPprof") + defer sp.End() r, err := stream.Receive() if err != nil { @@ -758,12 +758,11 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1 return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request")) } request := r.Request - sp.LogFields( - otlog.String("start", model.Time(request.Start).Time().String()), - otlog.String("end", model.Time(request.End).Time().String()), - otlog.String("selector", request.LabelSelector), - otlog.String("profile_id", request.Type.ID), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(request.Start).Time().String()), + attribute.String("end", model.Time(request.End).Time().String()), + attribute.String("selector", request.LabelSelector), + attribute.String("profile_id", request.Type.ID))) queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End)) if err != nil { @@ -889,8 +888,9 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 { } func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - Block") + defer sp.End() + if err := b.Open(ctx); err != nil { return nil, err } @@ -1045,9 +1045,9 @@ func (q *singleBlockQuerier) openFiles(ctx context.Context) error { sp, ctx := opentracing.StartSpanFromContext(ctx, "BlockQuerier - open") defer func() { q.metrics.blockOpeningLatency.Observe(time.Since(start).Seconds()) - sp.LogFields( - otlog.String("block_ulid", q.meta.ULID.String()), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("block_ulid", q.meta.ULID.String()))) + sp.Finish() }() g, ctx := errgroup.WithContext(ctx) diff --git a/pkg/phlaredb/head.go b/pkg/phlaredb/head.go index 3c131920d..b54a787e9 100644 --- a/pkg/phlaredb/head.go +++ b/pkg/phlaredb/head.go @@ -17,13 +17,15 @@ import ( "github.com/google/pprof/profile" "github.com/google/uuid" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/samber/lo" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -533,8 +535,8 @@ func (h *Head) Queriers() Queriers { // add the location IDs to the stacktraces func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stacktracesByMapping) *ingestv1.MergeProfilesStacktracesResult { - sp, _ := opentracing.StartSpanFromContext(ctx, "resolveStacktraces - Head") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolveStacktraces - Head") + defer sp.End() names := []string{} functions := map[int64]int{} @@ -548,7 +550,7 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac h.strings.lock.RUnlock() }() - sp.LogFields(otlog.String("msg", "building MergeProfilesStacktracesResult")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building MergeProfilesStacktracesResult"))) _ = stacktracesByMapping.ForEach( func(mapping uint64, stacktraceSamples stacktraceSampleMap) error { mp, ok := h.symbolDB.MappingReader(mapping) @@ -595,8 +597,8 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac } func (h *Head) resolvePprof(ctx context.Context, stacktracesByMapping profileSampleByMapping) *profile.Profile { - sp, _ := opentracing.StartSpanFromContext(ctx, "resolvePprof - Head") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolvePprof - Head") + defer sp.End() locations := map[int32]*profile.Location{} functions := map[uint64]*profile.Function{} diff --git a/pkg/phlaredb/head_queriers.go b/pkg/phlaredb/head_queriers.go index 73dc3bd38..fb132e30f 100644 --- a/pkg/phlaredb/head_queriers.go +++ b/pkg/phlaredb/head_queriers.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/segmentio/parquet-go" + "go.opentelemetry.io/otel" ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1" @@ -34,8 +35,8 @@ func (q *headOnDiskQuerier) Open(_ context.Context) error { } func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadOnDisk") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadOnDisk") + defer sp.End() // query the index for rows rowIter, labelsPerFP, err := q.head.profiles.index.selectMatchingRowRanges(ctx, params, q.rowGroupIdx) @@ -106,8 +107,8 @@ func (q *headOnDiskQuerier) Bounds() (model.Time, model.Time) { } func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadOnDisk") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadOnDisk") + defer sp.End() stacktraceSamples := stacktracesByMapping{} @@ -120,8 +121,8 @@ func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.It } func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByPprof - HeadOnDisk") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByPprof - HeadOnDisk") + defer sp.End() stacktraceSamples := profileSampleByMapping{} @@ -133,8 +134,8 @@ func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[P } func (q *headOnDiskQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadOnDisk") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadOnDisk") + defer sp.End() seriesByLabels := make(seriesByLabels) @@ -168,8 +169,8 @@ func (q *headInMemoryQuerier) Open(_ context.Context) error { } func (q *headInMemoryQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadInMemory") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadInMemory") + defer sp.End() index := q.head.profiles.index @@ -215,8 +216,8 @@ func (q *headInMemoryQuerier) Bounds() (model.Time, model.Time) { } func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadInMemory") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadInMemory") + defer sp.End() stacktraceSamples := stacktracesByMapping{} @@ -243,8 +244,8 @@ func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter. } func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "MergePprof - HeadInMemory") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergePprof - HeadInMemory") + defer sp.End() stacktraceSamples := profileSampleByMapping{} @@ -267,8 +268,8 @@ func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator } func (q *headInMemoryQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadInMemory") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadInMemory") + defer sp.End() labelsByFingerprint := map[model.Fingerprint]string{} seriesByLabels := make(seriesByLabels) diff --git a/pkg/phlaredb/phlaredb.go b/pkg/phlaredb/phlaredb.go index 640410353..391f0755d 100644 --- a/pkg/phlaredb/phlaredb.go +++ b/pkg/phlaredb/phlaredb.go @@ -18,9 +18,11 @@ import ( "github.com/grafana/dskit/services" "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/common/model" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" profilev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" @@ -380,12 +382,8 @@ func filterProfiles[B BidiServerMerge[Res, Req], Profile: maxBlockProfile, Index: 0, }, true, its...), batchProfileSize, func(ctx context.Context, batch []ProfileWithIndex) error { - sp, _ := opentracing.StartSpanFromContext(ctx, "filterProfiles - Filtering batch") - sp.LogFields( - otlog.Int("batch_len", len(batch)), - otlog.Int("batch_requested_size", batchProfileSize), - ) - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "filterProfiles - Filtering batch") + defer sp.End() seriesByFP := map[model.Fingerprint]labelWithIndex{} selectProfileResult.LabelsSets = selectProfileResult.LabelsSets[:0] @@ -409,7 +407,7 @@ func filterProfiles[B BidiServerMerge[Res, Req], }) } - sp.LogFields(otlog.String("msg", "sending batch to client")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending batch to client"))) var err error switch s := BidiServerMerge[Res, Req](stream).(type) { case BidiServerMerge[*ingestv1.MergeProfilesStacktracesResponse, *ingestv1.MergeProfilesStacktracesRequest]: @@ -433,9 +431,9 @@ func filterProfiles[B BidiServerMerge[Res, Req], } return err } - sp.LogFields(otlog.String("msg", "batch sent to client")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "batch sent to client"))) - sp.LogFields(otlog.String("msg", "reading selection from client")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "reading selection from client"))) // handle response for the batch. var selected []bool @@ -462,7 +460,7 @@ func filterProfiles[B BidiServerMerge[Res, Req], } return err } - sp.LogFields(otlog.String("msg", "selection received")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "selection received"))) for i, k := range selected { if k { selection[batch[i].Index] = append(selection[batch[i].Index], batch[i].Profile) diff --git a/pkg/phlaredb/profiles.go b/pkg/phlaredb/profiles.go index 5d28d4484..8e236b1aa 100644 --- a/pkg/phlaredb/profiles.go +++ b/pkg/phlaredb/profiles.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/samber/lo" + "go.opentelemetry.io/otel" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -195,8 +196,9 @@ func (pi *profilesIndex) Add(ps *schemav1.InMemoryProfile, lbs phlaremodel.Label } func (pi *profilesIndex) selectMatchingFPs(ctx context.Context, params *ingestv1.SelectProfilesRequest) ([]model.Fingerprint, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "selectMatchingFPs - Index") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMatchingFPs - Index") + defer sp.End() + selectors, err := parser.ParseMetricSelector(params.LabelSelector) if err != nil { return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error()) @@ -246,8 +248,8 @@ func (pi *profilesIndex) selectMatchingRowRanges(ctx context.Context, params *in map[model.Fingerprint]phlaremodel.Labels, error, ) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "selectMatchingRowRanges - Index") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMatchingRowRanges - Index") + defer sp.End() ids, err := pi.selectMatchingFPs(ctx, params) if err != nil { diff --git a/pkg/phlaredb/query/repeated.go b/pkg/phlaredb/query/repeated.go index d2264321c..7164d1a1c 100644 --- a/pkg/phlaredb/query/repeated.go +++ b/pkg/phlaredb/query/repeated.go @@ -7,9 +7,10 @@ import ( "github.com/grafana/dskit/multierror" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/samber/lo" "github.com/segmentio/parquet-go" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/grafana/phlare/pkg/iter" ) @@ -24,7 +25,7 @@ type repeatedPageIterator[T any] struct { column int readSize int ctx context.Context - span opentracing.Span + span trace.Span rgs []parquet.RowGroup startRowGroupRowNum int64 @@ -134,10 +135,10 @@ Outer: return false } it.span.LogFields( - otlog.String("msg", "Page read"), - otlog.Int64("startRowGroupRowNum", it.startRowGroupRowNum), - otlog.Int64("startPageRowNum", it.startPageRowNum), - otlog.Int64("pageRowNum", it.currentPage.NumRows()), + attribute.String("msg", "Page read"), + attribute.Int64("startRowGroupRowNum", it.startRowGroupRowNum), + attribute.Int64("startPageRowNum", it.startPageRowNum), + attribute.Int64("pageRowNum", it.currentPage.NumRows()), ) it.valueReader = it.currentPage.Values() } diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go index 9b5681530..bde145544 100644 --- a/pkg/phlaredb/sample_merge.go +++ b/pkg/phlaredb/sample_merge.go @@ -6,10 +6,12 @@ import ( "github.com/google/pprof/profile" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" "github.com/samber/lo" "github.com/segmentio/parquet-go" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" @@ -20,8 +22,8 @@ import ( ) func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - Block") + defer sp.End() stacktraceAggrValues := make(stacktracesByMapping) if err := mergeByStacktraces(ctx, b.profiles.file, rows, stacktraceAggrValues); err != nil { @@ -33,8 +35,8 @@ func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.I } func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - Block") + defer sp.End() stacktraceAggrValues := make(profileSampleByMapping) if err := mergeByStacktraces(ctx, b.profiles.file, rows, stacktraceAggrValues); err != nil { @@ -85,18 +87,18 @@ func (b *singleBlockQuerier) resolveLocations(ctx context.Context, mapping uint6 } func (b *singleBlockQuerier) resolvePprofSymbols(ctx context.Context, profileSampleByMapping profileSampleByMapping) (*profile.Profile, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "ResolvePprofSymbols - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ResolvePprofSymbols - Block") + defer sp.End() locationsIdsByStacktraceID := newLocationsIdsByStacktraceID(len(profileSampleByMapping) * 1024) // gather stacktraces if err := profileSampleByMapping.ForEach(func(mapping uint64, samples profileSampleMap) error { stacktraceIDs := samples.Ids() - sp.LogFields( - otlog.Int("stacktraces", len(stacktraceIDs)), - otlog.Uint64("mapping", mapping), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.Int("stacktraces", len(stacktraceIDs)), + attribute.Uint64("mapping", mapping))) + return b.resolveLocations(ctx, mapping, locationsIdsByStacktraceID, stacktraceIDs) }); err != nil { return nil, err @@ -245,26 +247,27 @@ func (b *singleBlockQuerier) resolvePprofSymbols(ctx context.Context, profileSam } func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMapping stacktracesByMapping) (*ingestv1.MergeProfilesStacktracesResult, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "ResolveSymbols - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ResolveSymbols - Block") + defer sp.End() + locationsIdsByStacktraceID := newLocationsIdsByStacktraceID(len(stacktracesByMapping) * 1024) // gather stacktraces if err := stacktracesByMapping.ForEach(func(mapping uint64, samples stacktraceSampleMap) error { stacktraceIDs := samples.Ids() - sp.LogFields( - otlog.Int("stacktraces", len(stacktraceIDs)), - otlog.Uint64("mapping", mapping), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.Int("stacktraces", len(stacktraceIDs)), + attribute.Uint64("mapping", mapping))) + return b.resolveLocations(ctx, mapping, locationsIdsByStacktraceID, stacktraceIDs) }); err != nil { return nil, err } - sp.LogFields(otlog.Int("locationIDs", len(locationsIdsByStacktraceID.locationIds()))) + sp.AddEvent("TODO", trace.WithAttributes(attribute.Int("locationIDs", len(locationsIdsByStacktraceID.locationIds())))) // gather locations - sp.LogFields(otlog.String("msg", "gather locations")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather locations"))) var ( locationIDsByFunctionID = newUniqueIDs[[]int64]() locations = b.locations.retrieveRows(ctx, locationsIdsByStacktraceID.locationIds().iterator()) @@ -279,10 +282,10 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa if err := locations.Err(); err != nil { return nil, err } - sp.LogFields(otlog.Int("functions", len(locationIDsByFunctionID))) + sp.AddEvent("TODO", trace.WithAttributes(attribute.Int("functions", len(locationIDsByFunctionID)))) // gather functions - sp.LogFields(otlog.String("msg", "gather functions")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather functions"))) var ( functionIDsByStringID = newUniqueIDs[[]int64]() functions = b.functions.retrieveRows(ctx, locationIDsByFunctionID.iterator()) @@ -297,7 +300,7 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa } // gather strings - sp.LogFields(otlog.String("msg", "gather strings")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather strings"))) var ( names = make([]string, len(functionIDsByStringID)) idSlice = make([][]int64, len(functionIDsByStringID)) @@ -314,7 +317,7 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa return nil, err } - sp.LogFields(otlog.String("msg", "build MergeProfilesStacktracesResult")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "build MergeProfilesStacktracesResult"))) // idSlice contains stringIDs and gets rewritten into functionIDs for nameID := range idSlice { var functionIDs []int64 @@ -361,8 +364,8 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa } func (b *singleBlockQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - Block") + defer sp.End() m := make(seriesByLabels) columnName := "TotalValue" @@ -469,8 +472,9 @@ type mapAdder interface { } func mergeByStacktraces(ctx context.Context, profileSource Source, rows iter.Iterator[Profile], m mapAdder) error { - sp, ctx := opentracing.StartSpanFromContext(ctx, "mergeByStacktraces") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "mergeByStacktraces") + defer sp.End() + // clone the rows to be able to iterate over them twice multiRows, err := iter.CloneN(rows, 2) if err != nil { diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index e3361dc67..ab5c18e3f 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -8,6 +8,7 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/opentracing/opentracing-go" "github.com/prometheus/prometheus/promql/parser" + "go.opentelemetry.io/otel" "golang.org/x/sync/errgroup" ingesterv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" @@ -58,8 +59,9 @@ func forAllIngesters[T any](ctx context.Context, ingesterQuerier *IngesterQuerie } func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.SelectMergeStacktracesRequest) (*phlaremodel.Tree, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectTree Ingesters") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectTree Ingesters") + defer sp.End() + profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) @@ -103,8 +105,9 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se } func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectSeries Ingesters") + defer sp.End() + responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesLabels, error) { return ic.MergeProfilesLabels(ctx), nil }) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 4d9ce48fd..edde0b6aa 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -15,13 +15,15 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" "github.com/samber/lo" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" @@ -112,8 +114,8 @@ func (q *Querier) stopping(_ error) error { } func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querierv1.ProfileTypesRequest]) (*connect.Response[querierv1.ProfileTypesResponse], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "ProfileTypes") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ProfileTypes") + defer sp.End() responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]*typesv1.ProfileType, error) { res, err := ic.ProfileTypes(childCtx, connect.NewRequest(&ingestv1.ProfileTypesRequest{})) @@ -148,9 +150,9 @@ func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querier func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelValues") defer func() { - sp.LogFields( - otlog.String("name", req.Msg.Name), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("name", req.Msg.Name))) + sp.Finish() }() responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]string, error) { @@ -173,8 +175,9 @@ func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1. } func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelNames") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "LabelNames") + defer sp.End() + responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]string, error) { res, err := ic.LabelNames(childCtx, connect.NewRequest(&typesv1.LabelNamesRequest{ Matchers: req.Msg.Matchers, @@ -196,9 +199,9 @@ func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.L func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "Series") defer func() { - sp.LogFields( - otlog.String("matchers", strings.Join(req.Msg.Matchers, ",")), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("matchers", strings.Join(req.Msg.Matchers, ",")))) + sp.Finish() }() responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]*typesv1.Labels, error) { @@ -227,13 +230,13 @@ func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.Ser func (q *Querier) Diff(ctx context.Context, req *connect.Request[querierv1.DiffRequest]) (*connect.Response[querierv1.DiffResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "Diff") defer func() { - sp.LogFields( - otlog.String("leftStart", model.Time(req.Msg.Left.Start).Time().String()), - otlog.String("leftEnd", model.Time(req.Msg.Left.End).Time().String()), + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("leftStart", model.Time(req.Msg.Left.Start).Time().String()), + attribute.String("leftEnd", model.Time(req.Msg.Left.End).Time().String()), // Assume are the same - otlog.String("selector", req.Msg.Left.LabelSelector), - otlog.String("profile_id", req.Msg.Left.ProfileTypeID), - ) + attribute.String("selector", req.Msg.Left.LabelSelector), + attribute.String("profile_id", req.Msg.Left.ProfileTypeID))) + sp.Finish() }() @@ -409,12 +412,12 @@ func splitQueryToStores(start, end model.Time, now model.Time, queryStoreAfter t func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[querierv1.SelectMergeProfileRequest]) (*connect.Response[googlev1.Profile], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMergeProfile") defer func() { - sp.LogFields( - otlog.String("start", model.Time(req.Msg.Start).Time().String()), - otlog.String("end", model.Time(req.Msg.End).Time().String()), - otlog.String("selector", req.Msg.LabelSelector), - otlog.String("profile_id", req.Msg.ProfileTypeID), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(req.Msg.Start).Time().String()), + attribute.String("end", model.Time(req.Msg.End).Time().String()), + attribute.String("selector", req.Msg.LabelSelector), + attribute.String("profile_id", req.Msg.ProfileTypeID))) + sp.Finish() }() @@ -467,14 +470,14 @@ func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[q func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries") defer func() { - sp.LogFields( - otlog.String("start", model.Time(req.Msg.Start).Time().String()), - otlog.String("end", model.Time(req.Msg.End).Time().String()), - otlog.String("selector", req.Msg.LabelSelector), - otlog.String("profile_id", req.Msg.ProfileTypeID), - otlog.String("group_by", strings.Join(req.Msg.GroupBy, ",")), - otlog.Float64("step", req.Msg.Step), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(req.Msg.Start).Time().String()), + attribute.String("end", model.Time(req.Msg.End).Time().String()), + attribute.String("selector", req.Msg.LabelSelector), + attribute.String("profile_id", req.Msg.ProfileTypeID), + attribute.String("group_by", strings.Join(req.Msg.GroupBy, ",")), + attribute.Float64("step", req.Msg.Step))) + sp.Finish() }() diff --git a/pkg/querier/select_merge.go b/pkg/querier/select_merge.go index 5f12d94e7..6a912cad4 100644 --- a/pkg/querier/select_merge.go +++ b/pkg/querier/select_merge.go @@ -8,13 +8,6 @@ import ( "github.com/google/pprof/profile" "github.com/grafana/dskit/multierror" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/common/model" - "github.com/samber/lo" - "golang.org/x/sync/errgroup" - - otlog "github.com/opentracing/opentracing-go/log" - googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1" @@ -24,6 +17,13 @@ import ( "github.com/grafana/phlare/pkg/pprof" "github.com/grafana/phlare/pkg/util" "github.com/grafana/phlare/pkg/util/loser" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/common/model" + "github.com/samber/lo" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" ) type ProfileWithLabels struct { @@ -220,8 +220,9 @@ func (s *mergeIterator[R, Req, Res]) Close() error { // skipDuplicates iterates through the iterator and skip duplicates. func skipDuplicates(ctx context.Context, its []MergeIterator) error { - span, _ := opentracing.StartSpanFromContext(ctx, "skipDuplicates") - defer span.Finish() + _, span := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "skipDuplicates") + defer span.End() + var errors multierror.MultiError tree := loser.New(its, &ProfileWithLabels{ @@ -259,8 +260,8 @@ func skipDuplicates(ctx context.Context, its []MergeIterator) error { } duplicates++ } - span.LogFields(otlog.Int("duplicates", duplicates)) - span.LogFields(otlog.Int("total", total)) + span.AddEvent("TODO", trace.WithAttributes(attribute.Int("duplicates", duplicates))) + span.AddEvent("TODO", trace.WithAttributes(attribute.Int("total", total))) return errors.Err() } @@ -268,8 +269,8 @@ func skipDuplicates(ctx context.Context, its []MergeIterator) error { // selectMergeTree selects the profile from each ingester by deduping them and // returns merge of stacktrace samples represented as a tree. func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesStacktraces]) (*phlaremodel.Tree, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "selectMergeTree") - defer span.Finish() + ctx, span := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMergeTree") + defer span.End() mergeResults := make([]MergeResult[*ingestv1.MergeProfilesStacktracesResult], len(responses)) iters := make([]MergeIterator, len(responses)) @@ -294,7 +295,7 @@ func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[client } // Collects the results in parallel. - span.LogFields(otlog.String("msg", "collecting merge results")) + span.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "collecting merge results"))) g, _ := errgroup.WithContext(ctx) m := phlaremodel.NewTreeMerger() sm := phlaremodel.NewStackTraceMerger() @@ -327,7 +328,7 @@ func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[client } } - span.LogFields(otlog.String("msg", "building tree")) + span.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building tree"))) return m.Tree(), nil } diff --git a/pkg/querier/store_gateway_querier.go b/pkg/querier/store_gateway_querier.go index 2f333d95b..787616673 100644 --- a/pkg/querier/store_gateway_querier.go +++ b/pkg/querier/store_gateway_querier.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/promql/parser" + "go.opentelemetry.io/otel" "golang.org/x/sync/errgroup" ingesterv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" @@ -151,8 +152,9 @@ func GetShuffleShardingSubring(ring ring.ReadRing, userID string, limits StoreGa } func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1.SelectMergeStacktracesRequest) (*phlaremodel.Tree, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectTree StoreGateway") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectTree StoreGateway") + defer sp.End() + profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) @@ -200,8 +202,9 @@ func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1 } func (q *Querier) selectSeriesFromStoreGateway(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries StoreGateway") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectSeries StoreGateway") + defer sp.End() + tenantID, err := tenant.ExtractTenantIDFromContext(ctx) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f4d8d31aa..99a1f8a1f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "github.com/grafana/phlare/pkg/frontend/frontendpb" @@ -200,7 +201,7 @@ type schedulerRequest struct { ctx context.Context ctxCancel context.CancelFunc - queueSpan opentracing.Span + queueSpan trace.Span // This is only used for testing. parentSpanContext opentracing.SpanContext