Skip to content

Commit 3763894

Browse files
committed
[enhancement](load)add LogicalPostProject to cast outputs according to dest table's schema
1 parent 0a1905f commit 3763894

File tree

5 files changed

+308
-60
lines changed

5 files changed

+308
-60
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java

Lines changed: 24 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.doris.common.ErrorCode;
3838
import org.apache.doris.common.ErrorReport;
3939
import org.apache.doris.common.UserException;
40-
import org.apache.doris.common.util.FileFormatConstants;
4140
import org.apache.doris.info.PartitionNamesInfo;
4241
import org.apache.doris.nereids.CascadesContext;
4342
import org.apache.doris.nereids.StatementContext;
@@ -64,6 +63,7 @@
6463
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
6564
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
6665
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
66+
import org.apache.doris.nereids.trees.plans.logical.LogicalPostProject;
6767
import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter;
6868
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
6969
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
@@ -221,16 +221,6 @@ public TFileScanRangeParams toFileScanRangeParams(TUniqueId loadId, NereidsFileG
221221

222222
return params;
223223
}
224-
225-
private String getHeaderType(String formatType) {
226-
if (formatType != null) {
227-
if (formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
228-
|| formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) {
229-
return formatType;
230-
}
231-
}
232-
return "";
233-
}
234224
}
235225

236226
private LoadPlanInfo loadPlanInfo;
@@ -348,38 +338,6 @@ public Void visitLogicalProject(LogicalProject<? extends Plan> logicalProject, P
348338
}
349339
}
350340

351-
// For Broker load with multiple file groups, all file groups share the same destTuple.
352-
// Create slots for destTuple only when processing the first file group (when slots are empty).
353-
// Subsequent file groups will reuse the slots created by the first file group.
354-
if (loadPlanInfo.destTuple.getSlots().isEmpty()) {
355-
List<Slot> slotList = outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
356-
357-
// ignore projectList's nullability and set the expr's nullable info same as
358-
// dest table column
359-
// why do this? looks like be works in this way...
360-
// and we have to do some extra work in visitLogicalFilter because this ood
361-
// behavior
362-
int size = slotList.size();
363-
List<Slot> newSlotList = new ArrayList<>(size);
364-
for (int i = 0; i < size; ++i) {
365-
SlotReference slot = (SlotReference) slotList.get(i);
366-
Column col = destTable.getColumn(slot.getName());
367-
if (col != null) {
368-
slot = slot.withColumn(col);
369-
if (col.isAutoInc()) {
370-
newSlotList.add(slot.withNullable(true));
371-
} else {
372-
newSlotList.add(slot.withNullable(col.isAllowNull()));
373-
}
374-
} else {
375-
newSlotList.add(slot);
376-
}
377-
}
378-
379-
for (Slot slot : newSlotList) {
380-
context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference) slot, destTable);
381-
}
382-
}
383341
List<SlotDescriptor> slotDescriptorList = loadPlanInfo.destTuple.getSlots();
384342
loadPlanInfo.destSlotIdToExprMap = Maps.newHashMap();
385343
for (int i = 0; i < slotDescriptorList.size(); ++i) {
@@ -401,16 +359,35 @@ public Void visitLogicalProject(LogicalProject<? extends Plan> logicalProject, P
401359
return null;
402360
}
403361

362+
@Override
363+
public Void visitLogicalPostProject(LogicalPostProject<? extends Plan> logicalPostProject,
364+
PlanTranslatorContext context) {
365+
List<NamedExpression> outputs = logicalPostProject.getOutputs();
366+
for (NamedExpression expr : outputs) {
367+
if (expr.containsType(AggregateFunction.class)) {
368+
throw new AnalysisException("Don't support aggregation function in load expression");
369+
}
370+
}
371+
372+
// For Broker load with multiple file groups, all file groups share the same destTuple.
373+
// Create slots for destTuple only when processing the first file group (when slots are empty).
374+
// Subsequent file groups will reuse the slots created by the first file group.
375+
if (loadPlanInfo.destTuple.getSlots().isEmpty()) {
376+
List<Slot> slotList = outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
377+
for (Slot slot : slotList) {
378+
context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference) slot, destTable);
379+
}
380+
}
381+
logicalPostProject.child().accept(this, context);
382+
return null;
383+
}
384+
404385
@Override
405386
public Void visitLogicalFilter(LogicalFilter<? extends Plan> logicalFilter, PlanTranslatorContext context) {
406387
logicalFilter.child().accept(this, context);
407388
loadPlanInfo.postFilterExprList = new ArrayList<>(logicalFilter.getConjuncts().size());
408389
for (Expression conjunct : logicalFilter.getConjuncts()) {
409390
Expr expr = ExpressionTranslator.translate(conjunct, context);
410-
// in visitLogicalProject, we set project exprs nullability same as dest table columns
411-
// the conjunct's nullability is based on project exprs, so we need clear the nullable info
412-
// and let conjunct calculate the nullability by itself to get the correct nullable info
413-
clearNullableFromNereidsRecursively(expr);
414391
loadPlanInfo.postFilterExprList.add(expr);
415392
}
416393
filterPredicate = logicalFilter.getPredicate();
@@ -429,19 +406,6 @@ public Void visitLogicalFilter(LogicalFilter<? extends Plan> logicalFilter, Plan
429406
return null;
430407
}
431408

432-
/**
433-
* Recursively clear nullable info from expression and all its children
434-
*/
435-
private void clearNullableFromNereidsRecursively(Expr expr) {
436-
if (expr == null) {
437-
return;
438-
}
439-
expr.clearNullableFromNereids();
440-
for (Expr child : expr.getChildren()) {
441-
clearNullableFromNereidsRecursively(child);
442-
}
443-
}
444-
445409
@Override
446410
public Void visitLogicalPreFilter(LogicalPreFilter<? extends Plan> logicalPreFilter,
447411
PlanTranslatorContext context) {

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.catalog.AggregateType;
2121
import org.apache.doris.catalog.Column;
22+
import org.apache.doris.catalog.OlapTable;
2223
import org.apache.doris.catalog.Table;
2324
import org.apache.doris.common.UserException;
2425
import org.apache.doris.info.PartitionNamesInfo;
@@ -42,6 +43,7 @@
4243
import org.apache.doris.nereids.trees.expressions.Alias;
4344
import org.apache.doris.nereids.trees.expressions.Expression;
4445
import org.apache.doris.nereids.trees.expressions.NamedExpression;
46+
import org.apache.doris.nereids.trees.expressions.Slot;
4547
import org.apache.doris.nereids.trees.expressions.SlotReference;
4648
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
4749
import org.apache.doris.nereids.trees.expressions.functions.scalar.JsonbParseErrorToNull;
@@ -54,6 +56,7 @@
5456
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
5557
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
5658
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
59+
import org.apache.doris.nereids.trees.plans.logical.LogicalPostProject;
5760
import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter;
5861
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
5962
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
@@ -206,6 +209,7 @@ public static LogicalPlan createLoadPlan(NereidsFileGroupInfo fileGroupInfo, Par
206209
new BindExpression(),
207210
new LoadProjectRewrite(fileGroupInfo.getTargetTable()),
208211
new BindSink(false),
212+
new AddPostProject(),
209213
new AddPostFilter(
210214
context.fileGroup.getWhereExpr()
211215
),
@@ -324,4 +328,43 @@ public Rule build() {
324328
}).toRule(RuleType.ADD_POST_FILTER_FOR_LOAD);
325329
}
326330
}
331+
332+
/** AddPostProject
333+
* The BindSink rule will produce the final project list for load, we need cast the outputs according to
334+
* dest table's schema
335+
* */
336+
private static class AddPostProject extends OneRewriteRuleFactory {
337+
public AddPostProject() {
338+
}
339+
340+
@Override
341+
public Rule build() {
342+
return logicalOlapTableSink().whenNot(plan -> plan.child() instanceof LogicalPostProject
343+
|| plan.child() instanceof LogicalFilter).thenApply(ctx -> {
344+
LogicalOlapTableSink logicalOlapTableSink = ctx.root;
345+
LogicalPlan childPlan = (LogicalPlan) logicalOlapTableSink.child();
346+
List<Slot> childOutputs = childPlan.getOutput();
347+
OlapTable destTable = logicalOlapTableSink.getTargetTable();
348+
int size = childOutputs.size();
349+
List<SlotReference> projectList = new ArrayList<>(size);
350+
for (int i = 0; i < size; ++i) {
351+
SlotReference slot = (SlotReference) childOutputs.get(i);
352+
Column col = destTable.getColumn(slot.getName());
353+
if (col != null) {
354+
slot = slot.withColumn(col);
355+
if (col.isAutoInc()) {
356+
projectList.add(slot.withNullable(true));
357+
} else {
358+
projectList.add(slot.withNullable(col.isAllowNull()));
359+
}
360+
} else {
361+
projectList.add(slot);
362+
}
363+
}
364+
return logicalOlapTableSink.withChildren(
365+
Lists.newArrayList(
366+
new LogicalPostProject(projectList, (Plan) logicalOlapTableSink.child(0))));
367+
}).toRule(RuleType.ADD_POST_PROJECT_FOR_LOAD);
368+
}
369+
}
327370
}

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,8 @@ public enum RuleType {
262262
REWRITE_LOAD_PROJECT_FOR_STREAM_LOAD(RuleTypeClass.REWRITE),
263263
// add post filter node for load
264264
ADD_POST_FILTER_FOR_LOAD(RuleTypeClass.REWRITE),
265+
// add post project node for load
266+
ADD_POST_PROJECT_FOR_LOAD(RuleTypeClass.REWRITE),
265267

266268
// Merge Consecutive plan
267269
MERGE_PROJECTS(RuleTypeClass.REWRITE),

0 commit comments

Comments
 (0)