-
Couldn't load subscription status.
- Fork 3.8k
refactor: implment parse as a projection #19579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
|
||
| case *physical.NamedLiteralExpr: | ||
| return &Scalar{ | ||
| value: expr.Literal, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the literal being used here in the case of parse is actually an array, which I'm aware is technically not a scalar. the value being passed here though is a pointer to that array, which one could argue is a scalar. that being said, let me know if you'd prefer another type. my thought was to avoid that so we don't need to type check the incoming literal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these were moved to the executor tests above
| return ExprTypeUnary | ||
| } | ||
|
|
||
| type NamedLiteralExpr struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to push back on just using a Literal instead, but I added this as I think it makes the optimize code cleaner, where we can look specifically for the requestedKeys literal when pushing down projections, rather than looking for any literal of the right type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Named literals do feel a little weird to me, especially just for an optimize pass, and when the position of the arguments do matter (the second argument must be the requested keys, not any argument that's a NamedLiteral of requestedKeys).
I think if we want to make hte optimize pass cleaner, we could unpack argument slices into a struct instead:
type parseArguments struct {
columnToParse Expression
requestedKeys Expression
}
// Unpack unpacks the expression from src into args. Unpack returns
// an error if there are not exactly 1 or 2 arguments:
//
// - parse(columnToParse)
// - parse(columnToParse, requestedKeys)
func (args *parseArguments) Unpack(src []Expression) error { ... }
// Pack packs args into a dst slice. Returns a new slice if dst isn't
// large enough.
func (args *parseArguments) Pack(dst []Expression) []Expression { ... }Then your optimization pass could use this:
func (r *projectionPushdown) handleParse(expr *FunctionExpr, ...) ([]ColumnExpression, bool) {
var args parseArguments
if err := args.Unpack(expr.Expressions); err != nil {
// Panic, I guess?
}
if args.requestedKeys == nil {
// Initialize args.requestedKeys
}
existingKeys, ok := args.requestedKeys.(types.StringListLiteral)
...
// Copy back over into the FunctionExpr.
expr.Arguments = args.Pack(expr.Arguments)
}f024161 to
40db5ef
Compare
| case *UnaryOp: | ||
| return b.processUnaryOp(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to remove UnaryOp here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I did not, thank you
| if u.reg == nil { | ||
| u.reg = make(map[types.FunctionOp]Function) | ||
| } | ||
| // TODO(twhitney): Should the function panic when duplicate keys are registered? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah probably, plus since it'd panic in the init we'd catch it immediately in unit tests rather than being confused about why we're not using the implementation of a function we expected.
| } | ||
|
|
||
| if sourceColVec == nil { | ||
| return nil, nil, fmt.Errorf("parse function arguments did no include a source ColumnVector to parse") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return nil, nil, fmt.Errorf("parse function arguments did no include a source ColumnVector to parse") | |
| return nil, nil, fmt.Errorf("parse function arguments did not include a source ColumnVector to parse") |
| }, input) | ||
| var requestedKeys []string | ||
| if requestedKeysColVec != nil { | ||
| reqKeysValue := requestedKeysColVec.Value(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert that the requestedKeysColVec must be a scalar? Otherwise I think the behaviour will be a little confusing if you happen to pass in an actual vector but only the first row gets used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| // Clone returns a copy of the [FunctionExpr]. | ||
| func (e *FunctionExpr) Clone() Expression { | ||
| params := make([]Expression, len(e.Expressions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use cloneExpressions(e.Expressions) here
| return ExprTypeUnary | ||
| } | ||
|
|
||
| type NamedLiteralExpr struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Named literals do feel a little weird to me, especially just for an optimize pass, and when the position of the arguments do matter (the second argument must be the requested keys, not any argument that's a NamedLiteral of requestedKeys).
I think if we want to make hte optimize pass cleaner, we could unpack argument slices into a struct instead:
type parseArguments struct {
columnToParse Expression
requestedKeys Expression
}
// Unpack unpacks the expression from src into args. Unpack returns
// an error if there are not exactly 1 or 2 arguments:
//
// - parse(columnToParse)
// - parse(columnToParse, requestedKeys)
func (args *parseArguments) Unpack(src []Expression) error { ... }
// Pack packs args into a dst slice. Returns a new slice if dst isn't
// large enough.
func (args *parseArguments) Pack(dst []Expression) []Expression { ... }Then your optimization pass could use this:
func (r *projectionPushdown) handleParse(expr *FunctionExpr, ...) ([]ColumnExpression, bool) {
var args parseArguments
if err := args.Unpack(expr.Expressions); err != nil {
// Panic, I guess?
}
if args.requestedKeys == nil {
// Initialize args.requestedKeys
}
existingKeys, ok := args.requestedKeys.(types.StringListLiteral)
...
// Copy back over into the FunctionExpr.
expr.Arguments = args.Pack(expr.Arguments)
}| }, nil | ||
|
|
||
| case *physical.NamedLiteralExpr: | ||
| return &Scalar{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I merged #19549 earlier today, so Scalar won't be available any more. Use NewScalar(expr.Literal, input.NumRows()) instead
| GetForSignature(types.FunctionOp) (Function, error) | ||
| } | ||
|
|
||
| type Function interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we call it VariadicFunction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha, I had that in one iteration, naming was hard, I went through a few options, but I'm happy to use Variadic.
| args := make([]ColumnVector, len(expr.Expressions)) | ||
| for i, arg := range expr.Expressions { | ||
| p, err := e.eval(arg, input) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| args[i] = p | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to find a way to optimize this at some point.
Parsing the argument expressions every time for each batch is a lot of overhead, especially also because these are always string literals (aren't they) and therefore have a single value across all rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah wait, the function argument is the message column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for now. Later, when we support | logfmt foo,bar this may become a problem.
| FLOAT64 = Type(arrow.FLOAT64) | ||
| TIMESTAMP = Type(arrow.TIMESTAMP) | ||
| STRUCT = Type(arrow.STRUCT) | ||
| LIST = Type(arrow.LIST) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add to Type.String() function
| return tStruct{arrowType: arrowType} | ||
| } | ||
|
|
||
| type tList struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add to the Loki->Arrow type mapping below
| case FunctionOpParseJSON: | ||
| return "PARSE_JSON" | ||
| default: | ||
| panic(fmt.Sprintf("unknown unary operator %d", t)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| panic(fmt.Sprintf("unknown unary operator %d", t)) | |
| panic(fmt.Sprintf("unknown variadic function operator %d", t)) |
commit 4e5f95f Author: Trevor Whitney <trevorjwhitney@gmail.com> Date: Thu Oct 23 13:47:59 2025 -0600 test: fix planner tests commit dfbdcb7 Merge: c997112 68df3ef Author: Trevor Whitney <trevorjwhitney@gmail.com> Date: Thu Oct 23 13:26:39 2025 -0600 Merge branch 'main' into twhitney/refactor-parse commit c997112 Author: Trevor Whitney <trevorjwhitney@gmail.com> Date: Thu Oct 23 13:24:03 2025 -0600 chore: fix linting errors commit 037e337 Author: Trevor Whitney <trevorjwhitney@gmail.com> Date: Thu Oct 23 12:54:27 2025 -0600 test: fix field names in expression test commit ad6b101 Merge: 79f2cea d4c53e9 Author: Trevor Whitney <trevorjwhitney@gmail.com> Date: Thu Oct 23 12:46:34 2025 -0600 Merge branch 'main' into twhitney/refactor-parse commit 79f2cea Author: Trevor Whitney <trevorjwhitney@gmail.com> Date: Thu Oct 23 12:44:12 2025 -0600 test: fix workflow planner test commit 40db5ef Author: Trevor Whitney <trevorjwhitney@gmail.com> Date: Thu Oct 23 11:38:25 2025 -0600 chore: clena up a few comments commit ad91fda Author: Trevor Whitney <trevorjwhitney@gmail.com> Date: Thu Oct 23 11:23:19 2025 -0600 refactor: implment parse as a projection
4e5f95f to
e7ead00
Compare
Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>
| └── Projection all=true expand=(PARSE_JSON(builtin.message, [])) | ||
| └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to merge these projections? maybe in a later PR?
| └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar, request_duration])) | ||
| └── Compat src=metadata dst=metadata collision=label |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chaudum does the Compat layer need to come before the Projection?
What this PR does / why we need it:
This PR refactors parse operations (
logfmtandjson) to be implemented as an operation expression on an expand projection, similar tounwrap, and not as a custom pipeline as was done previously. In doing so, this PR introduces a new operation type ofFunctionOpthat can have any number of Values/Expressions as arguments, all of which are evaluated before being passed to the registered function, which is registered just on op type (and not arg type, since args are variable).I also introduced a
NamedLiteralExprwhich is just a literal with a name. I thought this made adding the requested keys optimization a bit cleaner, but as it's just a literal under the hood, I'm happy to remove it if we think it's uncessary.Special notes for your reviewer:
Checklist
CONTRIBUTING.mdguide (required)featPRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.mddeprecated-config.yamlanddeleted-config.yamlfiles respectively in thetools/deprecated-config-checkerdirectory. Example PR