Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/engine/internal/executor/column.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package executor

import (
"fmt"
"slices"

"github.com/apache/arrow-go/v18/arrow"
Expand Down Expand Up @@ -51,6 +52,20 @@ func NewScalar(value types.Literal, rows int) arrow.Array {
for range rows {
builder.Append(arrow.Timestamp(value))
}
case *array.ListBuilder:
//TODO(twhitney): currently only supporting string list, but we can add more types here as we need them
value, ok := value.Any().([]string)
if !ok {
panic(fmt.Errorf("unsupported list literal type: %T", value))
}

valueBuilder := builder.ValueBuilder().(*array.StringBuilder)
for range rows {
builder.Append(true)
for _, val := range value {
valueBuilder.Append(val)
}
}
}
return builder.NewArray()
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/engine/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
return tracePipeline("physical.RangeAggregation", c.executeRangeAggregation(ctx, n, inputs))
case *physical.VectorAggregation:
return tracePipeline("physical.VectorAggregation", c.executeVectorAggregation(ctx, n, inputs))
case *physical.ParseNode:
return tracePipeline("physical.ParseNode", c.executeParse(ctx, n, inputs))
case *physical.ColumnCompat:
return tracePipeline("physical.ColumnCompat", c.executeColumnCompat(ctx, n, inputs))
case *physical.Parallelize:
Expand Down Expand Up @@ -367,18 +365,6 @@ func (c *Context) executeVectorAggregation(ctx context.Context, plan *physical.V
return pipeline
}

func (c *Context) executeParse(ctx context.Context, parse *physical.ParseNode, inputs []Pipeline) Pipeline {
if len(inputs) == 0 {
return emptyPipeline()
}

if len(inputs) > 1 {
return errorPipeline(ctx, fmt.Errorf("parse expects exactly one input, got %d", len(inputs)))
}

return NewParsePipeline(parse, inputs[0])
}

func (c *Context) executeColumnCompat(ctx context.Context, compat *physical.ColumnCompat, inputs []Pipeline) Pipeline {
if len(inputs) == 0 {
return emptyPipeline()
Expand Down
24 changes: 0 additions & 24 deletions pkg/engine/internal/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,3 @@ func TestExecutor_Projection(t *testing.T) {
require.ErrorContains(t, err, "projection expects exactly one input, got 2")
})
}

func TestExecutor_Parse(t *testing.T) {
t.Run("no inputs result in empty pipeline", func(t *testing.T) {
ctx := t.Context()
c := &Context{}
pipeline := c.executeParse(ctx, &physical.ParseNode{
Kind: physical.ParserLogfmt,
RequestedKeys: []string{"level", "status"},
}, nil)
_, err := pipeline.Read(ctx)
require.ErrorContains(t, err, EOF.Error())
})

t.Run("multiple inputs result in error", func(t *testing.T) {
ctx := t.Context()
c := &Context{}
pipeline := c.executeParse(ctx, &physical.ParseNode{
Kind: physical.ParserLogfmt,
RequestedKeys: []string{"level"},
}, []Pipeline{emptyPipeline(), emptyPipeline()})
_, err := pipeline.Read(ctx)
require.ErrorContains(t, err, "parse expects exactly one input, got 2")
})
}
16 changes: 16 additions & 0 deletions pkg/engine/internal/executor/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,22 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
_, lhsIsScalar := expr.Left.(*physical.LiteralExpr)
_, rhsIsScalar := expr.Right.(*physical.LiteralExpr)
return fn.Evaluate(lhs, rhs, lhsIsScalar, rhsIsScalar)

case *physical.VariadicExpr:
args := make([]arrow.Array, len(expr.Expressions))
for i, arg := range expr.Expressions {
p, err := e.eval(arg, input)
if err != nil {
return nil, err
}
args[i] = p
}

fn, err := variadicFunctions.GetForSignature(expr.Op)
if err != nil {
return nil, fmt.Errorf("failed to lookup unary function: %w", err)
}
return fn.Evaluate(args...)
}

return nil, fmt.Errorf("unknown expression: %v", expr)
Expand Down
Loading