Skip to content

Commit c280531

Browse files
authored
chore: Fix column name regression (#19468)
The temporary streamID column from the dataobj scan node caused an error when the expression evaluator tried to parse the FQN, which was not valid any more after the recent change that implemented the semantic naming conventions. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
1 parent d5f1570 commit c280531

File tree

2 files changed

+11
-12
lines changed

2 files changed

+11
-12
lines changed

pkg/engine/internal/executor/dataobjscan.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,16 +221,9 @@ func (s *dataobjScan) initLogs() error {
221221
return fmt.Errorf("logs.Reader returned schema with %d fields, expected %d", got, want)
222222
}
223223

224+
// Convert the logs columns to engine-compatible fields.
224225
var desiredFields []arrow.Field
225-
for i, col := range columnsToRead {
226-
if col.Type == logs.ColumnTypeStreamID {
227-
// The stream ID field should be left as-is for use with the streams
228-
// injector.
229-
desiredFields = append(desiredFields, origSchema.Field(i))
230-
continue
231-
}
232-
233-
// Convert the logs column to an engine-compatible field.
226+
for _, col := range columnsToRead {
234227
field, err := logsColumnToEngineField(col)
235228
if err != nil {
236229
return err
@@ -255,6 +248,9 @@ func makeScalars[S ~[]E, E any](s S) []scalar.Scalar {
255248
// engine.
256249
func logsColumnToEngineField(col *logs.Column) (arrow.Field, error) {
257250
switch col.Type {
251+
case logs.ColumnTypeStreamID:
252+
return semconv.FieldFromIdent(streamInjectorColumnIdent, true), nil
253+
258254
case logs.ColumnTypeTimestamp:
259255
return semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, true), nil
260256

pkg/engine/internal/executor/stream_injector.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
"github.com/grafana/loki/v3/pkg/engine/internal/types"
1515
)
1616

17-
var streamInjectorColumnName = "stream_id.int64"
17+
var (
18+
streamInjectorColumnName = "int64.generated.stream_id"
19+
streamInjectorColumnIdent = semconv.MustParseFQN(streamInjectorColumnName)
20+
)
1821

1922
// streamInjector injects stream labels into a logs Arrow record, replacing the
2023
// streams ID column with columns for the labels composing those streams.
@@ -40,14 +43,14 @@ func newStreamInjector(alloc memory.Allocator, view *streamsView) *streamInjecto
4043
//
4144
// The returned record must be Release()d by the caller when no longer needed.
4245
func (si *streamInjector) Inject(ctx context.Context, in arrow.Record) (arrow.Record, error) {
43-
streamIDCol, streamIDIndex, err := columnForFQN(streamInjectorColumnName, in)
46+
streamIDCol, streamIDIndex, err := columnForIdent(streamInjectorColumnIdent, in)
4447
if err != nil {
4548
return nil, err
4649
}
4750

4851
streamIDValues, ok := streamIDCol.(*array.Int64)
4952
if !ok {
50-
return nil, fmt.Errorf("column %s must be of type int64, got %s", streamInjectorColumnName, in.Schema().Field(streamIDIndex))
53+
return nil, fmt.Errorf("column %s must be of type int64, got %s", streamInjectorColumnIdent.FQN(), in.Schema().Field(streamIDIndex))
5154
}
5255

5356
type labelColumn struct {

0 commit comments

Comments
 (0)