Skip to content

Commit 72e0330

Browse files
authored
branch-3.1: [Chore](code-clear)Unify the use of BrokerDesc as the storage property bridge for Load (#57581)
This involves some code in #53013 removing redundant property declarations. BulkLoadDesc has no persistence or serialization requirements, so related implementations have been safely removed to simplify the code.
1 parent d1760f1 commit 72e0330

File tree

2 files changed

+15
-28
lines changed

2 files changed

+15
-28
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,6 @@
410410
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVReplaceInfo;
411411
import org.apache.doris.nereids.trees.plans.commands.info.AlterViewInfo;
412412
import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
413-
import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
414413
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
415414
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
416415
import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
@@ -1174,22 +1173,9 @@ public List<Pair<LogicalPlan, StatementContext>> visitMultiStatements(MultiState
11741173
*/
11751174
@Override
11761175
public LogicalPlan visitLoad(DorisParser.LoadContext ctx) {
1177-
1178-
BulkStorageDesc bulkDesc = null;
1176+
BrokerDesc brokerDesc = null;
11791177
if (ctx.withRemoteStorageSystem() != null) {
1180-
Map<String, String> bulkProperties =
1181-
new HashMap<>(visitPropertyItemList(ctx.withRemoteStorageSystem().brokerProperties));
1182-
if (ctx.withRemoteStorageSystem().S3() != null) {
1183-
bulkDesc = new BulkStorageDesc("S3", BulkStorageDesc.StorageType.S3, bulkProperties);
1184-
} else if (ctx.withRemoteStorageSystem().HDFS() != null) {
1185-
bulkDesc = new BulkStorageDesc("HDFS", BulkStorageDesc.StorageType.HDFS, bulkProperties);
1186-
} else if (ctx.withRemoteStorageSystem().LOCAL() != null) {
1187-
bulkDesc = new BulkStorageDesc("LOCAL_HDFS", BulkStorageDesc.StorageType.LOCAL, bulkProperties);
1188-
} else if (ctx.withRemoteStorageSystem().BROKER() != null
1189-
&& ctx.withRemoteStorageSystem().identifierOrText().getText() != null) {
1190-
bulkDesc = new BulkStorageDesc(ctx.withRemoteStorageSystem().identifierOrText().getText(),
1191-
bulkProperties);
1192-
}
1178+
brokerDesc = visitWithRemoteStorageSystem(ctx.withRemoteStorageSystem());
11931179
}
11941180
ImmutableList.Builder<BulkLoadDataDesc> dataDescriptions = new ImmutableList.Builder<>();
11951181
List<String> labelParts = visitMultipartIdentifier(ctx.lableName);
@@ -1270,7 +1256,7 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) {
12701256
String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText();
12711257
String comment =
12721258
LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1));
1273-
return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc, properties, comment);
1259+
return new LoadCommand(labelName, dataDescriptions.build(), brokerDesc, properties, comment);
12741260
}
12751261

12761262
/* ********************************************************************************************

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.doris.nereids.trees.plans.commands;
1919

20+
import org.apache.doris.analysis.BrokerDesc;
2021
import org.apache.doris.analysis.StmtType;
22+
import org.apache.doris.analysis.StorageBackend;
2123
import org.apache.doris.catalog.Column;
2224
import org.apache.doris.catalog.Env;
2325
import org.apache.doris.catalog.OlapTable;
@@ -49,7 +51,6 @@
4951
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
5052
import org.apache.doris.nereids.trees.plans.PlanType;
5153
import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
52-
import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
5354
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
5455
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
5556
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
@@ -89,7 +90,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
8990
public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
9091

9192
private final String labelName;
92-
private final BulkStorageDesc bulkStorageDesc;
93+
private final BrokerDesc brokerDesc;
9394
private final Set<String> sinkTableNames = new HashSet<>();
9495
private final List<BulkLoadDataDesc> sourceInfos;
9596
private final Map<String, String> properties;
@@ -100,13 +101,13 @@ public class LoadCommand extends Command implements ForwardWithSync {
100101
/**
101102
* constructor of ExportCommand
102103
*/
103-
public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, BulkStorageDesc bulkStorageDesc,
104+
public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, BrokerDesc brokerDesc,
104105
Map<String, String> properties, String comment) {
105106
super(PlanType.LOAD_COMMAND);
106107
this.labelName = Objects.requireNonNull(labelName.trim(), "labelName should not null");
107108
this.sourceInfos = Objects.requireNonNull(ImmutableList.copyOf(sourceInfos), "sourceInfos should not null");
108109
this.properties = Objects.requireNonNull(ImmutableMap.copyOf(properties), "properties should not null");
109-
this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, "bulkStorageDesc should not null");
110+
this.brokerDesc = Objects.requireNonNull(brokerDesc, "brokerDesc should not null");
110111
this.comment = Objects.requireNonNull(comment, "comment should not null");
111112
}
112113

@@ -151,7 +152,7 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD
151152
LOG.debug("nereids load stmt before conversion: {}", dataDesc::toSql);
152153
}
153154
// 1. build source projects plan (select col1,col2... from tvf where prefilter)
154-
Map<String, String> tvfProperties = getTvfProperties(dataDesc, bulkStorageDesc);
155+
Map<String, String> tvfProperties = getTvfProperties(dataDesc, brokerDesc);
155156
LogicalPlan tvfLogicalPlan = new LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties));
156157
tvfLogicalPlan = buildTvfQueryPlan(dataDesc, tvfProperties, tvfLogicalPlan);
157158

@@ -431,15 +432,15 @@ private static void checkAndAddSequenceCol(OlapTable olapTable, BulkLoadDataDesc
431432

432433
private UnboundTVFRelation getUnboundTVFRelation(Map<String, String> properties) {
433434
UnboundTVFRelation relation;
434-
if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) {
435+
if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) {
435436
relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
436437
S3TableValuedFunction.NAME, new Properties(properties));
437-
} else if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.HDFS) {
438+
} else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) {
438439
relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
439440
HdfsTableValuedFunction.NAME, new Properties(properties));
440441
} else {
441442
throw new UnsupportedOperationException("Unsupported load storage type: "
442-
+ bulkStorageDesc.getStorageType());
443+
+ brokerDesc.getStorageType());
443444
}
444445
return relation;
445446
}
@@ -454,8 +455,8 @@ private static OlapTable getOlapTable(ConnectContext ctx, BulkLoadDataDesc dataD
454455
return targetTable;
455456
}
456457

457-
private static Map<String, String> getTvfProperties(BulkLoadDataDesc dataDesc, BulkStorageDesc bulkStorageDesc) {
458-
Map<String, String> tvfProperties = new HashMap<>(bulkStorageDesc.getProperties());
458+
private static Map<String, String> getTvfProperties(BulkLoadDataDesc dataDesc, BrokerDesc brokerDesc) {
459+
Map<String, String> tvfProperties = new HashMap<>(brokerDesc.getProperties());
459460
String fileFormat = dataDesc.getFormatDesc().getFileFormat().orElse("csv");
460461
if ("csv".equalsIgnoreCase(fileFormat)) {
461462
dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep ->
@@ -469,7 +470,7 @@ private static Map<String, String> getTvfProperties(BulkLoadDataDesc dataDesc, B
469470
List<String> filePaths = dataDesc.getFilePaths();
470471
// TODO: support multi location by union
471472
String listFilePath = filePaths.get(0);
472-
if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) {
473+
if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) {
473474
// TODO: check file path by s3 fs list status
474475
tvfProperties.put("uri", listFilePath);
475476
}

0 commit comments

Comments
 (0)