From 334021f77eebe7454e8ceb20a46654dc45fba51c Mon Sep 17 00:00:00 2001 From: minghong Date: Tue, 21 Oct 2025 17:55:34 +0800 Subject: [PATCH] [fix](nereids)set RuntimeFilterInfo only on BE which is merge node (#57108) set RuntimeFilterInfo only on BE which is merge node. note: RuntimeFilterInfo only contains runtime-filter merge information, not runtime filter descriptor In the previous pull request #56978, we moved the runtime info (which records how runtime filters are merged) from the instance level to the Backend (BE) level. In a multi-BE environment, when a runtime filter needs to be merged, the BE does not perform the merge using the mergeInstance specified by the Runtime Filter descriptor. Instead, it selects a BE that has the RuntimeInfo set to perform the merge. This causes BEs that should not be responsible for merging to wait for other nodes to send local runtime filters, resulting in a time --- .../plans/distribute/worker/BackendWorker.java | 2 +- .../qe/runtime/RuntimeFiltersThriftBuilder.java | 15 ++++++--------- .../doris/qe/runtime/ThriftPlansBuilder.java | 11 ++++++++--- gensrc/thrift/PaloInternalService.thrift | 1 + 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java index 63c73b50edcd07..e76934cf847597 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java @@ -45,7 +45,7 @@ public String address() { @Override public String brpcAddress() { - return backend.getHost() + brpcPort(); + return backend.getHost() + ":" + brpcPort(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java index f9ab8e83f07e99..fca5461fe1cb72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java @@ -65,7 +65,7 @@ public boolean isMergeRuntimeFilterInstance(AssignedJob instance) { return mergeInstance == instance; } - public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParams) { + public void populateRuntimeFilterParams(TRuntimeFilterParams runtimeFilterParams) { for (RuntimeFilter rf : runtimeFilters) { List targets = ridToTargets.get(rf.getFilterId()); if (targets == null) { @@ -89,8 +89,7 @@ public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParam } runtimeFilterParams.putToRidToTargetParamv2( - rf.getFilterId().asInt(), new ArrayList<>(targetToParams.values()) - ); + rf.getFilterId().asInt(), new ArrayList<>(targetToParams.values())); } } for (Map.Entry entry : ridToBuilderNum.entrySet()) { @@ -122,15 +121,14 @@ public static RuntimeFiltersThriftBuilder compute( PlanFragment fragment = plan.getFragmentJob().getFragment(); // Transform to for (RuntimeFilterId rid : fragment.getTargetRuntimeFilterIds()) { - List targetFragments = - ridToTargetParam.computeIfAbsent(rid, k -> new ArrayList<>()); + List targetFragments = ridToTargetParam.computeIfAbsent(rid, + k -> new ArrayList<>()); for (AssignedJob instanceJob : plan.getInstanceJobs()) { BackendWorker backendWorker = (BackendWorker) instanceJob.getAssignedWorker(); Backend backend = backendWorker.getBackend(); targetFragments.add(new RuntimeFilterTarget( fragment.getFragmentId().asInt(), - new TNetworkAddress(backend.getHost(), backend.getBrpcPort()) - )); + new TNetworkAddress(backend.getHost(), backend.getBrpcPort()))); } } @@ -146,8 +144,7 @@ public static RuntimeFiltersThriftBuilder compute( } return new RuntimeFiltersThriftBuilder( mergeAddress, runtimeFilters, mergeInstance, - broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum - ); + broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum); } public static class RuntimeFilterTarget { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 5fcd14fcb79689..394e0cd5b1c6c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -125,7 +125,7 @@ public static Map plansToThr // so we can merge and send multiple fragment to a backend use one rpc for (Entry kv : workerToCurrentFragment.entrySet()) { TPipelineFragmentParamsList fragments = fragmentsGroupByWorker.computeIfAbsent( - kv.getKey(), w -> beToThrift(runtimeFiltersThriftBuilder, + kv.getKey(), w -> beToThrift(kv.getKey(), runtimeFiltersThriftBuilder, topNFilterThriftSupplier)); fragments.addToParamsList(kv.getValue()); } @@ -298,18 +298,23 @@ private static TPlanFragmentDestination instanceToDestination(AssignedJob instan } private static TPipelineFragmentParamsList beToThrift( + DistributedPlanWorker worker, RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder, Supplier> topNFilterThriftSupplier) { TPipelineFragmentParamsList beParam = new TPipelineFragmentParamsList(); TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo(); runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get()); - // set for runtime filter TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams(); runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress); + if (worker.host().equals(runtimeFiltersThriftBuilder.mergeAddress.getHostname()) + && worker.brpcPort() == runtimeFiltersThriftBuilder.mergeAddress.getPort()) { + // only set following information for merge BE node + runtimeFiltersThriftBuilder.populateRuntimeFilterParams(runtimeFilterParams); + } runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams); - runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams); beParam.setRuntimeFilterInfo(runtimeFilterInfo); + return beParam; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index c5982ed4b0fa22..8f587f38bc3dff 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -442,6 +442,7 @@ struct TRuntimeFilterParams { // Runtime filter merge instance address. Used if this filter has a remote target 1: optional Types.TNetworkAddress runtime_filter_merge_addr + // keep 2/3/4/5 unset if BE is not used for merge // deprecated 2: optional map> rid_to_target_param