Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 2a758fe

Browse files
committed
Make row ranges generic
1 parent 2fab9c1 commit 2a758fe

File tree

4 files changed

+74
-106
lines changed

4 files changed

+74
-106
lines changed

pkg/phlaredb/block_querier.go

Lines changed: 56 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -960,7 +960,7 @@ type profileMatchIterator struct {
960960
lbls map[int64]labelsInfo
961961
}
962962

963-
func newProfileMatchIterator(file *parquet.File, lbls map[int64]labelsInfo) *profileMatchIterator {
963+
func newProfileMatchIterator(file *parquet.File, lbls map[int64]labelsInfo) query.Iterator {
964964
it := &profileMatchIterator{file: file, lbls: lbls}
965965
timeNanosIndex, _ := query.GetColumnIndexByPath(file, "TimeNanos")
966966
it.rowGroups = file.RowGroups()
@@ -993,6 +993,11 @@ type parquetUint32Reader interface {
993993
ReadUint32s(values []uint32) (int, error)
994994
}
995995

996+
func (it *profileMatchIterator) Seek(to query.RowNumberWithDefinitionLevel) bool {
997+
panic("not seeking for anyone")
998+
return false
999+
}
1000+
9961001
func (it *profileMatchIterator) readNextRowGroup() error {
9971002
rg := it.rowGroups[it.currentRowGroup]
9981003
//it.currentRowGroup++
@@ -1093,8 +1098,7 @@ func (it *profileMatchIterator) Close() error {
10931098
return nil
10941099
}
10951100

1096-
func (it *profileMatchIterator) At() Profile {
1097-
1101+
func (it *profileMatchIterator) At() *query.IteratorResult {
10981102
panic("xx")
10991103
return nil
11001104
}
@@ -1145,97 +1149,61 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
11451149
}
11461150
}
11471151

1148-
/*
1149-
var (
1150-
buf [][]parquet.Value
1151-
joinIters []query.Iterator
1152-
)
1153-
pgIdx := 0
1154-
1155-
pages := cc.Pages()
1156-
1157-
for {
1158-
pg, err := pages.ReadPage()
1159-
if err == io.EOF {
1160-
break
1161-
}
1162-
if err != nil {
1163-
return nil, err
1164-
}
1165-
1166-
valueReader := pg.Values()
1167-
vR, ok := valueReader.(parquet.Int64Reader)
1168-
if !ok {
1169-
return nil, fmt.Errorf("unexpected type for value reader %T", valueReader)
1170-
}
1171-
1172-
values := make([]int64, pg.NumValues())
1173-
_, err = vR.ReadInt64s(values)
1174-
if err != nil && err != io.EOF {
1175-
return nil, err
1176-
}
1177-
1178-
fmt.Printf("numValues=%+#v\n ", pg.NumValues())
1179-
1180-
pgIdx++
1181-
1182-
}
1183-
1184-
}
1185-
*/
1186-
1187-
return newProfileMatchIterator(b.profiles.file, lblsPerRef), nil
1152+
var (
1153+
buf [][]parquet.Value
1154+
joinIters []query.Iterator
1155+
)
11881156

1189-
/*
1157+
seriesIndexIter := newProfileMatchIterator(b.profiles.file, lblsPerRef)
11901158

1191-
if b.meta.Version >= 2 {
1192-
joinIters = []query.Iterator{
1193-
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
1194-
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
1195-
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
1196-
}
1197-
buf = make([][]parquet.Value, 3)
1198-
} else {
1199-
joinIters = []query.Iterator{
1200-
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
1201-
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
1202-
}
1203-
buf = make([][]parquet.Value, 2)
1159+
if b.meta.Version >= 2 {
1160+
joinIters = []query.Iterator{
1161+
seriesIndexIter,
1162+
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
1163+
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
1164+
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
12041165
}
1205-
1206-
pIt := query.NewJoinIterator(0, joinIters, nil)
1207-
iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef))
1208-
defer pIt.Close()
1209-
1210-
currSeriesIndex := int64(-1)
1211-
var currentSeriesSlice []Profile
1212-
for pIt.Next() {
1213-
res := pIt.At()
1214-
buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition")
1215-
seriesIndex := buf[0][0].Int64()
1216-
if seriesIndex != currSeriesIndex {
1217-
currSeriesIndex = seriesIndex
1218-
if len(currentSeriesSlice) > 0 {
1219-
fmt.Println(len(currentSeriesSlice))
1220-
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
1221-
}
1222-
currentSeriesSlice = make([]Profile, 0, 100)
1223-
}
1224-
1225-
currentSeriesSlice = append(currentSeriesSlice, BlockProfile{
1226-
labels: lblsPerRef[seriesIndex].lbs,
1227-
fp: lblsPerRef[seriesIndex].fp,
1228-
ts: model.TimeFromUnixNano(buf[1][0].Int64()),
1229-
stacktracePartition: retrieveStacktracePartition(buf, 2),
1230-
RowNum: res.RowNumber[0],
1231-
})
1166+
buf = make([][]parquet.Value, 3)
1167+
} else {
1168+
joinIters = []query.Iterator{
1169+
seriesIndexIter,
1170+
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
12321171
}
1233-
if len(currentSeriesSlice) > 0 {
1234-
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
1172+
buf = make([][]parquet.Value, 2)
1173+
}
1174+
1175+
pIt := query.NewJoinIterator(0, joinIters, nil)
1176+
iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef))
1177+
defer pIt.Close()
1178+
1179+
currSeriesIndex := int64(-1)
1180+
var currentSeriesSlice []Profile
1181+
for pIt.Next() {
1182+
res := pIt.At()
1183+
buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition")
1184+
seriesIndex := buf[0][0].Int64()
1185+
if seriesIndex != currSeriesIndex {
1186+
currSeriesIndex = seriesIndex
1187+
if len(currentSeriesSlice) > 0 {
1188+
fmt.Println(len(currentSeriesSlice))
1189+
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
1190+
}
1191+
currentSeriesSlice = make([]Profile, 0, 100)
12351192
}
12361193

1237-
return iter.NewMergeIterator(maxBlockProfile, false, iters...), nil
1238-
*/
1194+
currentSeriesSlice = append(currentSeriesSlice, BlockProfile{
1195+
labels: lblsPerRef[seriesIndex].lbs,
1196+
fp: lblsPerRef[seriesIndex].fp,
1197+
ts: model.TimeFromUnixNano(buf[1][0].Int64()),
1198+
stacktracePartition: retrieveStacktracePartition(buf, 2),
1199+
RowNum: res.RowNumber[0],
1200+
})
1201+
}
1202+
if len(currentSeriesSlice) > 0 {
1203+
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
1204+
}
1205+
1206+
return iter.NewMergeIterator(maxBlockProfile, false, iters...), nil
12391207
}
12401208

12411209
func (b *singleBlockQuerier) Sort(in []Profile) []Profile {

pkg/phlaredb/head_queriers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
6666
for pIt.Next() {
6767
res := pIt.At()
6868

69-
v, ok := res.Entries[0].RowValue.(fingerprintWithRowNum)
69+
v, ok := res.Entries[0].RowValue.(rowNumWithSomething[model.Fingerprint])
7070
if !ok {
7171
panic("no fingerprint information found")
7272
}
7373

74-
lbls, ok := labelsPerFP[v.fp]
74+
lbls, ok := labelsPerFP[v.elem]
7575
if !ok {
7676
panic("no profile series labels with matching fingerprint found")
7777
}
@@ -83,7 +83,7 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
8383
}
8484
profiles = append(profiles, BlockProfile{
8585
labels: lbls,
86-
fp: v.fp,
86+
fp: v.elem,
8787
ts: model.TimeFromUnixNano(buf[0][0].Int64()),
8888
stacktracePartition: retrieveStacktracePartition(buf, 1),
8989
RowNum: res.RowNumber[0],

pkg/phlaredb/profile_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func Test_rowRangeIter(t *testing.T) {
168168
result := []int64{}
169169
for it.Next() {
170170
result = append(result, it.At().RowNumber())
171-
assert.Equal(t, model.Fingerprint(0xff), it.At().fp)
171+
assert.Equal(t, model.Fingerprint(0xff), it.At().elem)
172172
}
173173
assert.Equal(t, tc.expected, result)
174174
})
@@ -238,7 +238,7 @@ func Test_rowRangesIter(t *testing.T) {
238238

239239
for it.Next() {
240240
rows = append(rows, it.At().RowNumber())
241-
fingerprints = append(fingerprints, it.At().fp)
241+
fingerprints = append(fingerprints, it.At().elem)
242242
}
243243
assert.Equal(t, tc.expRows, rows)
244244
assert.Equal(t, tc.expFingerprints, fingerprints)

pkg/phlaredb/profiles.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (s rowRangesWithSeriesIndex) getSeriesIndex(rowNum int64) uint32 {
5656

5757
type rowRanges map[rowRange]model.Fingerprint
5858

59-
func (rR rowRanges) iter() iter.Iterator[fingerprintWithRowNum] {
59+
func (rR rowRanges) iter() iter.Iterator[rowNumWithSomething[model.Fingerprint]] {
6060
// ensure row ranges is sorted
6161
rRSlice := lo.Keys(rR)
6262
sort.Slice(rRSlice, func(i, j int) bool {
@@ -68,40 +68,40 @@ func (rR rowRanges) iter() iter.Iterator[fingerprintWithRowNum] {
6868
fps = append(fps, rR[elem])
6969
}
7070

71-
return &rowRangesIter{
71+
return &rowRangesIter[model.Fingerprint]{
7272
r: rRSlice,
7373
fps: fps,
7474
pos: 0,
7575
}
7676
}
7777

78-
type fingerprintWithRowNum struct {
79-
fp model.Fingerprint
78+
type rowNumWithSomething[A any] struct {
79+
elem A
8080
rowNum int64
8181
}
8282

83-
func (f fingerprintWithRowNum) RowNumber() int64 {
83+
func (f rowNumWithSomething[A]) RowNumber() int64 {
8484
return f.rowNum
8585
}
8686

8787
func (r rowRanges) fingerprintsWithRowNum() query.Iterator {
8888
return query.NewRowNumberIterator(r.iter())
8989
}
9090

91-
type rowRangesIter struct {
91+
type rowRangesIter[A any] struct {
9292
r []rowRange
93-
fps []model.Fingerprint
93+
fps []A
9494
pos int64
9595
}
9696

97-
func (i *rowRangesIter) At() fingerprintWithRowNum {
98-
return fingerprintWithRowNum{
97+
func (i *rowRangesIter[A]) At() rowNumWithSomething[A] {
98+
return rowNumWithSomething[A]{
99+
elem: i.fps[0],
99100
rowNum: i.pos - 1,
100-
fp: i.fps[0],
101101
}
102102
}
103103

104-
func (i *rowRangesIter) Next() bool {
104+
func (i *rowRangesIter[A]) Next() bool {
105105
if len(i.r) == 0 {
106106
return false
107107
}
@@ -118,9 +118,9 @@ func (i *rowRangesIter) Next() bool {
118118
return true
119119
}
120120

121-
func (i *rowRangesIter) Close() error { return nil }
121+
func (i *rowRangesIter[A]) Close() error { return nil }
122122

123-
func (i *rowRangesIter) Err() error { return nil }
123+
func (i *rowRangesIter[A]) Err() error { return nil }
124124

125125
type profileSeries struct {
126126
lbs phlaremodel.Labels

0 commit comments

Comments
 (0)