-
Notifications
You must be signed in to change notification settings - Fork 92
feat: add ExchangeRel support to core #602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
458aef0 to
8c33967
Compare
isthmus/src/main/java/io/substrait/isthmus/SubstraitRelNodeConverter.java
Show resolved
Hide resolved
|
@nielspardon do you have any other comments ? or is the PR good to merge ? |
nielspardon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the changes look good. I only have some minor remarks
| ProtoExpressionConverter converter = | ||
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | ||
| List<FieldReference> fieldReferences = | ||
| rel.getScatterByFields().getFieldsList().stream() | ||
| .map(converter::from) | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do realize most of the other usages of ProtoExpressionConverter in this class also just use converter as the variable name. Given that we have so many different converters in the code base it might make sense to use a more specific variable name to make it clear which one it is. We could use the approach from the newAggregate() method in this class which calls the variable protoExprConverter.
| ProtoExpressionConverter converter = | |
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | |
| List<FieldReference> fieldReferences = | |
| rel.getScatterByFields().getFieldsList().stream() | |
| .map(converter::from) | |
| .collect(Collectors.toList()); | |
| ProtoExpressionConverter protoExprConverter = | |
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | |
| List<FieldReference> fieldReferences = | |
| rel.getScatterByFields().getFieldsList().stream() | |
| .map(protoExprConverter::from) | |
| .collect(Collectors.toList()); |
| ProtoExpressionConverter converter = | ||
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | ||
|
|
||
| ImmutableSingleBucketExchange.Builder builder = | ||
| SingleBucketExchange.builder() | ||
| .input(input) | ||
| .partitionCount(rel.getPartitionCount()) | ||
| .targets(targets) | ||
| .expression(converter.from(rel.getSingleTarget().getExpression())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, we could use a more specific variable name
| ProtoExpressionConverter converter = | |
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | |
| ImmutableSingleBucketExchange.Builder builder = | |
| SingleBucketExchange.builder() | |
| .input(input) | |
| .partitionCount(rel.getPartitionCount()) | |
| .targets(targets) | |
| .expression(converter.from(rel.getSingleTarget().getExpression())); | |
| ProtoExpressionConverter protoExprConverter = | |
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | |
| ImmutableSingleBucketExchange.Builder builder = | |
| SingleBucketExchange.builder() | |
| .input(input) | |
| .partitionCount(rel.getPartitionCount()) | |
| .targets(targets) | |
| .expression(protoExprConverter.from(rel.getSingleTarget().getExpression())); |
| ProtoExpressionConverter converter = | ||
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | ||
|
|
||
| ImmutableMultiBucketExchange.Builder builder = | ||
| MultiBucketExchange.builder() | ||
| .input(input) | ||
| .partitionCount(rel.getPartitionCount()) | ||
| .targets(targets) | ||
| .expression(converter.from(rel.getMultiTarget().getExpression())) | ||
| .constrainedToCount(rel.getMultiTarget().getConstrainedToCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, we could use a more specific variable name
| ProtoExpressionConverter converter = | |
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | |
| ImmutableMultiBucketExchange.Builder builder = | |
| MultiBucketExchange.builder() | |
| .input(input) | |
| .partitionCount(rel.getPartitionCount()) | |
| .targets(targets) | |
| .expression(converter.from(rel.getMultiTarget().getExpression())) | |
| .constrainedToCount(rel.getMultiTarget().getConstrainedToCount()); | |
| ProtoExpressionConverter protoExprConverter = | |
| new ProtoExpressionConverter(lookup, extensions, input.getRecordType(), this); | |
| ImmutableMultiBucketExchange.Builder builder = | |
| MultiBucketExchange.builder() | |
| .input(input) | |
| .partitionCount(rel.getPartitionCount()) | |
| .targets(targets) | |
| .expression(protoExprConverter.from(rel.getMultiTarget().getExpression())) | |
| .constrainedToCount(rel.getMultiTarget().getConstrainedToCount()); |
| if (target.getType() instanceof TargetType.Uri) { | ||
| builder.setUri(((TargetType.Uri) target.getType()).getUri()); | ||
| } | ||
| if (target.getType() instanceof TargetType.Extended) { | ||
| builder.setExtended(((TargetType.Extended) target.getType()).getExtended()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that TargetType.Uri and TargetType.Extended are classes target.getType() can not be both so I would write this with an else if:
| if (target.getType() instanceof TargetType.Uri) { | |
| builder.setUri(((TargetType.Uri) target.getType()).getUri()); | |
| } | |
| if (target.getType() instanceof TargetType.Extended) { | |
| builder.setExtended(((TargetType.Extended) target.getType()).getExtended()); | |
| } | |
| if (target.getType() instanceof TargetType.Uri) { | |
| builder.setUri(((TargetType.Uri) target.getType()).getUri()); | |
| } else if (target.getType() instanceof TargetType.Extended) { | |
| builder.setExtended(((TargetType.Extended) target.getType()).getExtended()); | |
| } |
| .append("=partitionCount") | ||
| .append(exchange.getPartitionCount) | ||
| .append("=targets") | ||
| .append(exchange.getTargets) | ||
| .append("fields=") | ||
| .append(exchange.getFields) | ||
| }) | ||
| } | ||
|
|
||
| override def visit(exchange: SingleBucketExchange, context: EmptyVisitationContext): String = { | ||
| withBuilder(exchange, 10)( | ||
| builder => { | ||
| builder | ||
| .append("=partitionCount") | ||
| .append(exchange.getPartitionCount) | ||
| .append("=targets") | ||
| .append(exchange.getTargets) | ||
| .append("=expression") | ||
| .append(exchange.getExpression) | ||
| }) | ||
| } | ||
|
|
||
| override def visit(exchange: MultiBucketExchange, context: EmptyVisitationContext): String = { | ||
| withBuilder(exchange, 10)( | ||
| builder => { | ||
| builder | ||
| .append("=partitionCount") | ||
| .append(exchange.getPartitionCount) | ||
| .append("=targets") | ||
| .append(exchange.getTargets) | ||
| .append("=expression") | ||
| .append(exchange.getExpression) | ||
| .append("=constrainedToCount") | ||
| .append(exchange.getConstrainedToCount) | ||
| }) | ||
| } | ||
|
|
||
| override def visit(exchange: RoundRobinExchange, context: EmptyVisitationContext): String = { | ||
| withBuilder(exchange, 10)( | ||
| builder => { | ||
| builder | ||
| .append("=partitionCount") | ||
| .append(exchange.getPartitionCount) | ||
| .append("=targets") | ||
| .append(exchange.getTargets) | ||
| .append("=exact") | ||
| .append(exchange.getExact) | ||
| }) | ||
| } | ||
|
|
||
| override def visit(exchange: BroadcastExchange, context: EmptyVisitationContext): String = { | ||
| withBuilder(exchange, 10)( | ||
| builder => { | ||
| builder | ||
| .append("=partitionCount") | ||
| .append(exchange.getPartitionCount) | ||
| .append("=targets") | ||
| .append(exchange.getTargets) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the equals sign always be the suffix of these string literals?
|
Thanks for addressing the feedback. the one thing I didn't notice yesterday which would be great if you can take care of is to also add some unit tests for the changes in the core. We have relation roundtrip test here which you can mirror: https://github.com/substrait-io/substrait-java/tree/main/core/src/test/java/io/substrait/type/proto |
Implement ExchangeRel in core. the spec https://github.com/substrait-io/substrait/blob/main/proto/substrait/algebra.proto#L470