diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d8c89648e268a0..6dff5ad7399cb8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3674,4 +3674,10 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true) public static String aws_credentials_provider_version = "v2"; + + @ConfField(description = { + "agent tasks 健康检查的时间间隔,默认五分钟,小于等于0时不做健康检查", + "agent tasks health check interval, default is five minutes, no health check when less than or equal to 0" + }) + public static long agent_task_health_check_intervals_ms = 5 * 60 * 1000L; // 5 min } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 28be939a5946bd..54a93657c15ff6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -264,6 +264,7 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.system.SystemInfoService.HostInfo; import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskCleanupDaemon; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.CompactionTask; import org.apache.doris.task.MasterTaskExecutor; @@ -580,6 +581,8 @@ public class Env { private StatisticsMetricCollector statisticsMetricCollector; + private AgentTaskCleanupDaemon agentTaskCleanupDaemon; + // if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it. private final Map> configtoThreads = ImmutableMap .of("dynamic_partition_check_interval_seconds", this::getDynamicPartitionScheduler); @@ -840,6 +843,9 @@ public Env(boolean isCheckpointCatalog) { this.dictionaryManager = new DictionaryManager(); this.keyManagerStore = new KeyManagerStore(); this.keyManager = KeyManagerFactory.getKeyManager(); + if (Config.agent_task_health_check_intervals_ms > 0) { + this.agentTaskCleanupDaemon = new AgentTaskCleanupDaemon(); + } } public static Map getSessionReportTimeMap() { @@ -1962,6 +1968,7 @@ protected void startMasterOnlyDaemonThreads() { if (keyManager != null) { keyManager.init(); } + agentTaskCleanupDaemon.start(); } // start threads that should run on all FE diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java new file mode 100644 index 00000000000000..bdf87911ec9785 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +public class AgentTaskCleanupDaemon extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(AgentTaskCleanupDaemon.class); + + public static final Integer MAX_FAILURE_TIMES = 3; + + private final Map beInactiveCheckFailures = Maps.newHashMap(); + + public AgentTaskCleanupDaemon() { + super("agent-task-cleanup", Config.agent_task_health_check_intervals_ms); + } + + @Override + protected void runAfterCatalogReady() { + LOG.info("Begin to clean up inactive agent tasks"); + SystemInfoService infoService = Env.getCurrentSystemInfo(); + infoService.getAllClusterBackends(false) + .forEach(backend -> { + long id = backend.getId(); + if (backend.isAlive()) { + beInactiveCheckFailures.remove(id); + } else { + Integer failureTimes = beInactiveCheckFailures.compute(id, (beId, failures) -> { + int updated = (failures == null ? 1 : failures + 1); + if (updated >= MAX_FAILURE_TIMES) { + removeInactiveBeAgentTasks(beId); + } + return updated; + }); + LOG.info("Check failure on be={}, times={}", failureTimes, failureTimes); + } + }); + + LOG.info("Finish to clean up inactive agent tasks"); + } + + private void removeInactiveBeAgentTasks(Long beId) { + AgentTaskQueue.removeTask(beId, (agentTask -> { + String errMsg = "BE down, this agent task is aborted"; + if (agentTask instanceof PushTask) { + PushTask task = ((PushTask) agentTask); + task.countDownLatchWithStatus(beId, agentTask.getTabletId(), new Status(TStatusCode.ABORTED, errMsg)); + } + agentTask.setFinished(true); + agentTask.setErrorCode(TStatusCode.ABORTED); + agentTask.setErrorMsg(errMsg); + if (LOG.isDebugEnabled()) { + LOG.debug("BE down, remove agent task: {}", agentTask); + } + })); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java index 97e1a3cc676e8f..b61a3aa708d0da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; /** * Task queue @@ -96,6 +97,18 @@ public static synchronized void removeTask(long backendId, TTaskType type, long --taskNum; } + public static synchronized void removeTask(long backendId, Consumer onTaskRemoved) { + Map> tasks = AgentTaskQueue.tasks.row(backendId); + tasks.forEach((type, taskSet) -> { + Iterator> it = taskSet.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + it.remove(); + onTaskRemoved.accept(entry.getValue()); + } + }); + } + /* * we cannot define a push task with only 'backendId', 'signature' and 'TTaskType' * add version and TPushType to help diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java index 4f9969db1659fd..cb06f4d27f6df0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java @@ -58,7 +58,7 @@ public class PushTask extends AgentTask { private TPushType pushType; private List conditions; // for synchronous delete - private MarkedCountDownLatch latch; + private MarkedCountDownLatch latch; // lzop decompress or not private boolean needDecompress; @@ -212,6 +212,18 @@ public void countDownLatch(long backendId, long tabletId) { } } + public void countDownLatchWithStatus(long backendId, long tabletId, Status st) { + if (this.latch == null) { + return; + } + if (latch.markedCountDownWithStatus(backendId, tabletId, st)) { + if (LOG.isDebugEnabled()) { + LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}", + latch.getCount(), backendId, tabletId, st); + } + } + } + // call this always means one of tasks is failed. count down to zero to finish entire task public void countDownToZero(TStatusCode code, String errMsg) { if (this.latch != null) { diff --git a/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy b/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy new file mode 100644 index 00000000000000..411ffa893a1bee --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_sc_fail_when_be_down", "docker") { + def options = new ClusterOptions() + options.cloudMode = false + options.beNum = 3 + options.feNum = 2 + options.enableDebugPoints() + options.feConfigs += ["agent_task_health_check_intervals_ms=5000"] + + docker(options) { + GetDebugPoint().clearDebugPointsForAllBEs() + + def tblName = "test_sc_fail_when_be_down" + sql """ DROP TABLE IF EXISTS ${tblName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tblName} ( + `k` int NOT NULL, + `v0` int NOT NULL, + `v1` int NOT NULL + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 24 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 3" + ) + """ + sql """ INSERT INTO ${tblName} SELECT number, number, number from numbers("number" = "1024") """ + + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep") + try { + sql """ ALTER TABLE ${tblName} MODIFY COLUMN v1 VARCHAR(100) """ + sleep(1000) + cluster.stopBackends(1, 2) + sleep(10000) + def ret = sql """ SHOW ALTER TABLE COLUMN WHERE TableName='test_sc_stuck_when_be_down' ORDER BY createtime DESC LIMIT 1 """ + println(ret) + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY createtime DESC LIMIT 1 """ + time 600 + } + assertTrue(false) + } catch (Throwable ignore) { + // do nothing + } + } +} diff --git a/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy b/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy new file mode 100644 index 00000000000000..e5eb2bd3028508 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_sc_success_when_be_down", "docker") { + def options = new ClusterOptions() + options.cloudMode = false + options.beNum = 3 + options.feNum = 2 + options.enableDebugPoints() + options.feConfigs += ["agent_task_health_check_intervals_ms=5000"] + + docker(options) { + GetDebugPoint().clearDebugPointsForAllBEs() + + def tblName = "test_sc_success_when_be_down" + sql """ DROP TABLE IF EXISTS ${tblName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tblName} ( + `k` int NOT NULL, + `v0` int NOT NULL, + `v1` int NOT NULL + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 24 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 3" + ) + """ + sql """ INSERT INTO ${tblName} SELECT number, number, number from numbers("number" = "1024") """ + + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep") + sql """ ALTER TABLE ${tblName} MODIFY COLUMN v0 VARCHAR(100) """ + sleep(3000) + cluster.stopBackends(1) + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY createtime DESC LIMIT 1 """ + time 600 +} + } +}