From 807371969ba9eeee1be678194fcfa730663a6389 Mon Sep 17 00:00:00 2001 From: luxl Date: Fri, 27 Feb 2026 15:40:46 +0800 Subject: [PATCH 1/3] close #17748 --- .../BackfillWorkflowExecutorDelegate.java | 95 ++++++- .../BackfillWorkflowExecutorDelegateTest.java | 236 ++++++++++++++++++ 2 files changed, 330 insertions(+), 1 deletion(-) create mode 100644 dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java index 0c684f2876f2..a5e1ca2345ef 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java @@ -18,14 +18,18 @@ package org.apache.dolphinscheduler.api.executor.workflow; import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.WorkflowLineageService; import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.ExecutionOrder; +import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.repository.CommandDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; import org.apache.dolphinscheduler.extract.base.client.Clients; import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest; @@ -57,6 +61,12 @@ public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate backfillTimeList) { - // todo: + // 1) Query downstream dependent workflows for the current workflow + final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition(); + final long upstreamWorkflowCode = upstreamWorkflow.getCode(); + + List downstreamDefinitions = + workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); + + if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { + log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); + return; + } + + // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream + // backfill + final List upstreamBackfillDates = backfillTimeList.stream() + .map(DateUtils::stringToZoneDateTime) + .collect(Collectors.toList()); + + // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO + for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { + long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); + + // Prevent self dependency + if (downstreamCode == upstreamWorkflowCode) { + log.warn("Skip self dependent workflow {}", downstreamCode); + continue; + } + + WorkflowDefinition downstreamWorkflow = + workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); + if (downstreamWorkflow == null) { + log.warn("Skip dependent workflow {}, definition not found", downstreamCode); + continue; + } + + if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) { + log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode); + continue; + } + + // Currently, reuse the same business date list as upstream for downstream backfill; + // later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition + // (taskParams). + BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams(); + boolean allLevelDependent = originalParams.isAllLevelDependent(); + ComplementDependentMode downstreamDependentMode = + allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE; + + BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(originalParams.getRunMode()) + .backfillDateList(upstreamBackfillDates) + .expectedParallelismNumber(originalParams.getExpectedParallelismNumber()) + // Control whether downstream will continue triggering its own dependencies based on + // allLevelDependent flag + .backfillDependentMode(downstreamDependentMode) + .allLevelDependent(allLevelDependent) + .executionOrder(originalParams.getExecutionOrder()) + .build(); + + BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder() + .loginUser(backfillWorkflowDTO.getLoginUser()) + .workflowDefinition(downstreamWorkflow) + .startNodes(backfillWorkflowDTO.getStartNodes()) + .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) + .taskDependType(backfillWorkflowDTO.getTaskDependType()) + .execType(backfillWorkflowDTO.getExecType()) + .warningType(backfillWorkflowDTO.getWarningType()) + .warningGroupId(downstreamWorkflow.getWarningGroupId()) + .runMode(dependentParams.getRunMode()) + .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) + .workerGroup(backfillWorkflowDTO.getWorkerGroup()) + .tenantCode(backfillWorkflowDTO.getTenantCode()) + .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) + .startParamList(backfillWorkflowDTO.getStartParamList()) + .dryRun(backfillWorkflowDTO.getDryRun()) + .backfillParams(dependentParams) + .build(); + + log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}", + downstreamCode, upstreamWorkflowCode, backfillTimeList); + + // 4) Recursively call execute to trigger downstream backfill + execute(dependentBackfillDTO); + } } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java new file mode 100644 index 000000000000..9e88feb7e090 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.api.service.WorkflowLineageService; +import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.ExecutionOrder; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; + +import java.lang.reflect.Method; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class BackfillWorkflowExecutorDelegateTest { + + @Spy + @InjectMocks + private BackfillWorkflowExecutorDelegate backfillWorkflowExecutorDelegate; + + @Mock + private WorkflowLineageService workflowLineageService; + + @Mock + private WorkflowDefinitionDao workflowDefinitionDao; + + @Test + public void testDoBackfillDependentWorkflow_NoDownstreamDefinitions() throws Exception { + long upstreamCode = 1L; + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder() + .code(upstreamCode) + .releaseState(ReleaseState.ONLINE) + .build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(true) + .executionOrder(ExecutionOrder.ASC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Collections.emptyList()); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + method.setAccessible(true); + + List backfillTimeList = Collections.singletonList("2026-02-01 00:00:00"); + + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList); + + verify(workflowDefinitionDao, never()).queryByCode(anyLong()); + } + + @Test + public void testDoBackfillDependentWorkflow_WithDownstream_AllLevelDependent() throws Exception { + long upstreamCode = 10L; + long downstreamCode = 20L; + + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder() + .code(upstreamCode) + .releaseState(ReleaseState.ONLINE) + .build(); + + WorkflowDefinition downstreamWorkflow = + WorkflowDefinition.builder() + .code(downstreamCode) + .releaseState(ReleaseState.ONLINE) + .warningGroupId(100) + .build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .expectedParallelismNumber(2) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(true) + .executionOrder(ExecutionOrder.DESC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + DependentWorkflowDefinition selfDependent = new DependentWorkflowDefinition(); + selfDependent.setWorkflowDefinitionCode(upstreamCode); + + DependentWorkflowDefinition validDependent = new DependentWorkflowDefinition(); + validDependent.setWorkflowDefinitionCode(downstreamCode); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Arrays.asList(selfDependent, validDependent)); + when(workflowDefinitionDao.queryByCode(downstreamCode)) + .thenReturn(Optional.of(downstreamWorkflow)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class); + doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate).execute(captor.capture()); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + method.setAccessible(true); + + List backfillTimeList = Arrays.asList( + "2026-02-01 00:00:00", + "2026-02-02 00:00:00"); + + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList); + + verify(workflowDefinitionDao).queryByCode(downstreamCode); + + BackfillWorkflowDTO captured = captor.getValue(); + Assertions.assertNotNull(captured); + Assertions.assertEquals(downstreamCode, captured.getWorkflowDefinition().getCode()); + Assertions.assertEquals(downstreamWorkflow.getWarningGroupId(), captured.getWarningGroupId()); + + BackfillWorkflowDTO.BackfillParamsDTO capturedParams = captured.getBackfillParams(); + Assertions.assertNotNull(capturedParams); + Assertions.assertEquals(params.getRunMode(), capturedParams.getRunMode()); + Assertions.assertEquals(params.getExpectedParallelismNumber(), capturedParams.getExpectedParallelismNumber()); + Assertions.assertEquals(params.getExecutionOrder(), capturedParams.getExecutionOrder()); + Assertions.assertEquals(ComplementDependentMode.ALL_DEPENDENT, capturedParams.getBackfillDependentMode()); + Assertions.assertTrue(capturedParams.isAllLevelDependent()); + Assertions.assertEquals(backfillTimeList.size(), capturedParams.getBackfillDateList().size()); + } + + @Test + public void testDoBackfillDependentWorkflow_WithDownstream_SingleLevelDependent() throws Exception { + long upstreamCode = 100L; + long downstreamCode = 200L; + + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder() + .code(upstreamCode) + .releaseState(ReleaseState.ONLINE) + .build(); + + WorkflowDefinition downstreamWorkflow = + WorkflowDefinition.builder() + .code(downstreamCode) + .releaseState(ReleaseState.ONLINE) + .warningGroupId(200) + .build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .expectedParallelismNumber(3) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(false) + .executionOrder(ExecutionOrder.ASC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + DependentWorkflowDefinition validDependent = new DependentWorkflowDefinition(); + validDependent.setWorkflowDefinitionCode(downstreamCode); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Collections.singletonList(validDependent)); + when(workflowDefinitionDao.queryByCode(downstreamCode)) + .thenReturn(Optional.of(downstreamWorkflow)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class); + doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate).execute(captor.capture()); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + method.setAccessible(true); + + List backfillTimeList = Collections.singletonList("2026-02-03 00:00:00"); + + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList); + + verify(workflowDefinitionDao).queryByCode(downstreamCode); + + BackfillWorkflowDTO captured = captor.getValue(); + Assertions.assertNotNull(captured); + + BackfillWorkflowDTO.BackfillParamsDTO capturedParams = captured.getBackfillParams(); + Assertions.assertNotNull(capturedParams); + Assertions.assertEquals(ComplementDependentMode.OFF_MODE, capturedParams.getBackfillDependentMode()); + Assertions.assertFalse(capturedParams.isAllLevelDependent()); + Assertions.assertEquals(backfillTimeList.size(), capturedParams.getBackfillDateList().size()); + } +} From b42a645711ca3b815c9cd0db20bdc0cdf4a1a706 Mon Sep 17 00:00:00 2001 From: luxl Date: Fri, 27 Feb 2026 16:10:42 +0800 Subject: [PATCH 2/3] fix: prevent circular dependency and wrong startNodes in backfill dependent workflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Problem 1: Replace single-level self-dep check with ThreadLocal visited set to detect and skip indirect circular dependencies (A→B→A), preventing StackOverflowError when allLevelDependent=true - Problem 2: Set startNodes=null for downstream workflows; upstream task node codes are not valid in a different workflow definition - Add tests for OFFLINE skip, not-found skip, and startNodes null assertion Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../BackfillWorkflowExecutorDelegate.java | 169 ++++++++++-------- .../BackfillWorkflowExecutorDelegateTest.java | 79 ++++++++ 2 files changed, 173 insertions(+), 75 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java index a5e1ca2345ef..0252f30deb52 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java @@ -41,7 +41,9 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -55,6 +57,8 @@ @Component public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate> { + private static final ThreadLocal> BACKFILL_VISITING_WORKFLOWS = new ThreadLocal<>(); + @Autowired private CommandDao commandDao; @@ -183,89 +187,104 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO, final List backfillTimeList) { - // 1) Query downstream dependent workflows for the current workflow - final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition(); - final long upstreamWorkflowCode = upstreamWorkflow.getCode(); - - List downstreamDefinitions = - workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); - - if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { - log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); - return; + final boolean isRootCall = BACKFILL_VISITING_WORKFLOWS.get() == null; + if (isRootCall) { + final Set visitedCodes = new HashSet<>(); + visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode()); + BACKFILL_VISITING_WORKFLOWS.set(visitedCodes); } + try { + final Set visitedCodes = BACKFILL_VISITING_WORKFLOWS.get(); - // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream - // backfill - final List upstreamBackfillDates = backfillTimeList.stream() - .map(DateUtils::stringToZoneDateTime) - .collect(Collectors.toList()); + // 1) Query downstream dependent workflows for the current workflow + final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition(); + final long upstreamWorkflowCode = upstreamWorkflow.getCode(); - // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO - for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { - long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); + List downstreamDefinitions = + workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); - // Prevent self dependency - if (downstreamCode == upstreamWorkflowCode) { - log.warn("Skip self dependent workflow {}", downstreamCode); - continue; + if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { + log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); + return; } - WorkflowDefinition downstreamWorkflow = - workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); - if (downstreamWorkflow == null) { - log.warn("Skip dependent workflow {}, definition not found", downstreamCode); - continue; + // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream + // backfill + final List upstreamBackfillDates = backfillTimeList.stream() + .map(DateUtils::stringToZoneDateTime) + .collect(Collectors.toList()); + + // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO + for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { + long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); + + // Prevent self-dependency and circular dependency chains + if (visitedCodes.contains(downstreamCode)) { + log.warn("Skip circular dependent workflow {}", downstreamCode); + continue; + } + + WorkflowDefinition downstreamWorkflow = + workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); + if (downstreamWorkflow == null) { + log.warn("Skip dependent workflow {}, definition not found", downstreamCode); + continue; + } + + if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) { + log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode); + continue; + } + + // Currently, reuse the same business date list as upstream for downstream backfill; + // later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition + // (taskParams). + BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams(); + boolean allLevelDependent = originalParams.isAllLevelDependent(); + ComplementDependentMode downstreamDependentMode = + allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE; + + BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(originalParams.getRunMode()) + .backfillDateList(upstreamBackfillDates) + .expectedParallelismNumber(originalParams.getExpectedParallelismNumber()) + // Control whether downstream will continue triggering its own dependencies based on + // allLevelDependent flag + .backfillDependentMode(downstreamDependentMode) + .allLevelDependent(allLevelDependent) + .executionOrder(originalParams.getExecutionOrder()) + .build(); + + BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder() + .loginUser(backfillWorkflowDTO.getLoginUser()) + .workflowDefinition(downstreamWorkflow) + .startNodes(null) + .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) + .taskDependType(backfillWorkflowDTO.getTaskDependType()) + .execType(backfillWorkflowDTO.getExecType()) + .warningType(backfillWorkflowDTO.getWarningType()) + .warningGroupId(downstreamWorkflow.getWarningGroupId()) + .runMode(dependentParams.getRunMode()) + .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) + .workerGroup(backfillWorkflowDTO.getWorkerGroup()) + .tenantCode(backfillWorkflowDTO.getTenantCode()) + .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) + .startParamList(backfillWorkflowDTO.getStartParamList()) + .dryRun(backfillWorkflowDTO.getDryRun()) + .backfillParams(dependentParams) + .build(); + + log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}", + downstreamCode, upstreamWorkflowCode, backfillTimeList); + + // 4) Mark as visiting before recursive trigger to detect cycles + visitedCodes.add(downstreamCode); + execute(dependentBackfillDTO); } - - if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) { - log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode); - continue; + } finally { + if (isRootCall) { + BACKFILL_VISITING_WORKFLOWS.remove(); } - - // Currently, reuse the same business date list as upstream for downstream backfill; - // later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition - // (taskParams). - BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams(); - boolean allLevelDependent = originalParams.isAllLevelDependent(); - ComplementDependentMode downstreamDependentMode = - allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE; - - BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder() - .runMode(originalParams.getRunMode()) - .backfillDateList(upstreamBackfillDates) - .expectedParallelismNumber(originalParams.getExpectedParallelismNumber()) - // Control whether downstream will continue triggering its own dependencies based on - // allLevelDependent flag - .backfillDependentMode(downstreamDependentMode) - .allLevelDependent(allLevelDependent) - .executionOrder(originalParams.getExecutionOrder()) - .build(); - - BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder() - .loginUser(backfillWorkflowDTO.getLoginUser()) - .workflowDefinition(downstreamWorkflow) - .startNodes(backfillWorkflowDTO.getStartNodes()) - .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) - .taskDependType(backfillWorkflowDTO.getTaskDependType()) - .execType(backfillWorkflowDTO.getExecType()) - .warningType(backfillWorkflowDTO.getWarningType()) - .warningGroupId(downstreamWorkflow.getWarningGroupId()) - .runMode(dependentParams.getRunMode()) - .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) - .workerGroup(backfillWorkflowDTO.getWorkerGroup()) - .tenantCode(backfillWorkflowDTO.getTenantCode()) - .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) - .startParamList(backfillWorkflowDTO.getStartParamList()) - .dryRun(backfillWorkflowDTO.getDryRun()) - .backfillParams(dependentParams) - .build(); - - log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}", - downstreamCode, upstreamWorkflowCode, backfillTimeList); - - // 4) Recursively call execute to trigger downstream backfill - execute(dependentBackfillDTO); } } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java index 9e88feb7e090..ad4c62b9bbfd 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java @@ -169,6 +169,7 @@ public void testDoBackfillDependentWorkflow_WithDownstream_AllLevelDependent() t Assertions.assertEquals(ComplementDependentMode.ALL_DEPENDENT, capturedParams.getBackfillDependentMode()); Assertions.assertTrue(capturedParams.isAllLevelDependent()); Assertions.assertEquals(backfillTimeList.size(), capturedParams.getBackfillDateList().size()); + Assertions.assertNull(captured.getStartNodes()); } @Test @@ -232,5 +233,83 @@ public void testDoBackfillDependentWorkflow_WithDownstream_SingleLevelDependent( Assertions.assertEquals(ComplementDependentMode.OFF_MODE, capturedParams.getBackfillDependentMode()); Assertions.assertFalse(capturedParams.isAllLevelDependent()); Assertions.assertEquals(backfillTimeList.size(), capturedParams.getBackfillDateList().size()); + Assertions.assertNull(captured.getStartNodes()); + } + + @Test + public void testDoBackfillDependentWorkflow_SkipWorkflowNotFound() throws Exception { + long upstreamCode = 1000L; + long downstreamCode = 2000L; + + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(true) + .executionOrder(ExecutionOrder.ASC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + DependentWorkflowDefinition dep = new DependentWorkflowDefinition(); + dep.setWorkflowDefinitionCode(downstreamCode); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Collections.singletonList(dep)); + when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.empty()); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + method.setAccessible(true); + + method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00")); + + verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any()); + } + + @Test + public void testDoBackfillDependentWorkflow_SkipOfflineWorkflow() throws Exception { + long upstreamCode = 3000L; + long downstreamCode = 4000L; + + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build(); + + WorkflowDefinition offlineDownstream = + WorkflowDefinition.builder().code(downstreamCode).releaseState(ReleaseState.OFFLINE).build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(true) + .executionOrder(ExecutionOrder.ASC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + DependentWorkflowDefinition dep = new DependentWorkflowDefinition(); + dep.setWorkflowDefinitionCode(downstreamCode); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Collections.singletonList(dep)); + when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.of(offlineDownstream)); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + method.setAccessible(true); + + method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00")); + + verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any()); } } From a3fe99a27fc1864f1acbaa80caa9d455c72d957b Mon Sep 17 00:00:00 2001 From: luxl Date: Fri, 27 Feb 2026 16:13:31 +0800 Subject: [PATCH 3/3] refactor: replace ThreadLocal with explicit parameter for cycle detection ThreadLocal is unnecessary here since the call chain is synchronous and private. Passing visitedCodes as a parameter is simpler, clearer, and avoids ThreadLocal lifecycle management. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../BackfillWorkflowExecutorDelegate.java | 176 ++++++++---------- .../BackfillWorkflowExecutorDelegateTest.java | 32 +++- 2 files changed, 104 insertions(+), 104 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java index 0252f30deb52..4e2de9c85a38 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -57,8 +58,6 @@ @Component public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate> { - private static final ThreadLocal> BACKFILL_VISITING_WORKFLOWS = new ThreadLocal<>(); - @Autowired private CommandDao commandDao; @@ -180,111 +179,100 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO } final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams(); if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) { - doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList); + final Set visitedCodes = new HashSet<>(); + visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode()); + doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, visitedCodes); } return backfillTriggerResponse.getWorkflowInstanceId(); } private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO, - final List backfillTimeList) { - final boolean isRootCall = BACKFILL_VISITING_WORKFLOWS.get() == null; - if (isRootCall) { - final Set visitedCodes = new HashSet<>(); - visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode()); - BACKFILL_VISITING_WORKFLOWS.set(visitedCodes); + final List backfillTimeList, + final Set visitedCodes) { + // 1) Query downstream dependent workflows for the current workflow + final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition(); + final long upstreamWorkflowCode = upstreamWorkflow.getCode(); + + List downstreamDefinitions = + workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); + + if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { + log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); + return; } - try { - final Set visitedCodes = BACKFILL_VISITING_WORKFLOWS.get(); - // 1) Query downstream dependent workflows for the current workflow - final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition(); - final long upstreamWorkflowCode = upstreamWorkflow.getCode(); + // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream + // backfill + final List upstreamBackfillDates = backfillTimeList.stream() + .map(DateUtils::stringToZoneDateTime) + .collect(Collectors.toList()); - List downstreamDefinitions = - workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); + // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO + for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { + long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); - if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { - log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); - return; + // Prevent self-dependency and circular dependency chains + if (visitedCodes.contains(downstreamCode)) { + log.warn("Skip circular dependent workflow {}", downstreamCode); + continue; } - // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream - // backfill - final List upstreamBackfillDates = backfillTimeList.stream() - .map(DateUtils::stringToZoneDateTime) - .collect(Collectors.toList()); - - // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO - for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { - long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); - - // Prevent self-dependency and circular dependency chains - if (visitedCodes.contains(downstreamCode)) { - log.warn("Skip circular dependent workflow {}", downstreamCode); - continue; - } - - WorkflowDefinition downstreamWorkflow = - workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); - if (downstreamWorkflow == null) { - log.warn("Skip dependent workflow {}, definition not found", downstreamCode); - continue; - } - - if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) { - log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode); - continue; - } - - // Currently, reuse the same business date list as upstream for downstream backfill; - // later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition - // (taskParams). - BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams(); - boolean allLevelDependent = originalParams.isAllLevelDependent(); - ComplementDependentMode downstreamDependentMode = - allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE; - - BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder() - .runMode(originalParams.getRunMode()) - .backfillDateList(upstreamBackfillDates) - .expectedParallelismNumber(originalParams.getExpectedParallelismNumber()) - // Control whether downstream will continue triggering its own dependencies based on - // allLevelDependent flag - .backfillDependentMode(downstreamDependentMode) - .allLevelDependent(allLevelDependent) - .executionOrder(originalParams.getExecutionOrder()) - .build(); - - BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder() - .loginUser(backfillWorkflowDTO.getLoginUser()) - .workflowDefinition(downstreamWorkflow) - .startNodes(null) - .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) - .taskDependType(backfillWorkflowDTO.getTaskDependType()) - .execType(backfillWorkflowDTO.getExecType()) - .warningType(backfillWorkflowDTO.getWarningType()) - .warningGroupId(downstreamWorkflow.getWarningGroupId()) - .runMode(dependentParams.getRunMode()) - .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) - .workerGroup(backfillWorkflowDTO.getWorkerGroup()) - .tenantCode(backfillWorkflowDTO.getTenantCode()) - .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) - .startParamList(backfillWorkflowDTO.getStartParamList()) - .dryRun(backfillWorkflowDTO.getDryRun()) - .backfillParams(dependentParams) - .build(); - - log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}", - downstreamCode, upstreamWorkflowCode, backfillTimeList); - - // 4) Mark as visiting before recursive trigger to detect cycles - visitedCodes.add(downstreamCode); - execute(dependentBackfillDTO); + WorkflowDefinition downstreamWorkflow = + workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); + if (downstreamWorkflow == null) { + log.warn("Skip dependent workflow {}, definition not found", downstreamCode); + continue; } - } finally { - if (isRootCall) { - BACKFILL_VISITING_WORKFLOWS.remove(); + + if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) { + log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode); + continue; } + + // Currently, reuse the same business date list as upstream for downstream backfill; + // later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition + // (taskParams). + BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams(); + boolean allLevelDependent = originalParams.isAllLevelDependent(); + ComplementDependentMode downstreamDependentMode = + allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE; + + BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(originalParams.getRunMode()) + .backfillDateList(upstreamBackfillDates) + .expectedParallelismNumber(originalParams.getExpectedParallelismNumber()) + // Control whether downstream will continue triggering its own dependencies based on + // allLevelDependent flag + .backfillDependentMode(downstreamDependentMode) + .allLevelDependent(allLevelDependent) + .executionOrder(originalParams.getExecutionOrder()) + .build(); + + BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder() + .loginUser(backfillWorkflowDTO.getLoginUser()) + .workflowDefinition(downstreamWorkflow) + .startNodes(null) + .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) + .taskDependType(backfillWorkflowDTO.getTaskDependType()) + .execType(backfillWorkflowDTO.getExecType()) + .warningType(backfillWorkflowDTO.getWarningType()) + .warningGroupId(downstreamWorkflow.getWarningGroupId()) + .runMode(dependentParams.getRunMode()) + .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) + .workerGroup(backfillWorkflowDTO.getWorkerGroup()) + .tenantCode(backfillWorkflowDTO.getTenantCode()) + .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) + .startParamList(backfillWorkflowDTO.getStartParamList()) + .dryRun(backfillWorkflowDTO.getDryRun()) + .backfillParams(dependentParams) + .build(); + + log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}", + downstreamCode, upstreamWorkflowCode, backfillTimeList); + + // 4) Mark as visiting before recursive trigger to detect cycles, then trigger downstream backfill + visitedCodes.add(downstreamCode); + execute(dependentBackfillDTO); } } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java index ad4c62b9bbfd..2113cd9799ff 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java @@ -37,8 +37,10 @@ import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -88,12 +90,14 @@ public void testDoBackfillDependentWorkflow_NoDownstreamDefinitions() throws Exc .thenReturn(Collections.emptyList()); Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( - "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); method.setAccessible(true); List backfillTimeList = Collections.singletonList("2026-02-01 00:00:00"); - method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList); + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes); verify(workflowDefinitionDao, never()).queryByCode(anyLong()); } @@ -145,14 +149,16 @@ public void testDoBackfillDependentWorkflow_WithDownstream_AllLevelDependent() t doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate).execute(captor.capture()); Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( - "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); method.setAccessible(true); List backfillTimeList = Arrays.asList( "2026-02-01 00:00:00", "2026-02-02 00:00:00"); - method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList); + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes); verify(workflowDefinitionDao).queryByCode(downstreamCode); @@ -216,12 +222,14 @@ public void testDoBackfillDependentWorkflow_WithDownstream_SingleLevelDependent( doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate).execute(captor.capture()); Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( - "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); method.setAccessible(true); List backfillTimeList = Collections.singletonList("2026-02-03 00:00:00"); - method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList); + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes); verify(workflowDefinitionDao).queryByCode(downstreamCode); @@ -265,10 +273,12 @@ public void testDoBackfillDependentWorkflow_SkipWorkflowNotFound() throws Except when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.empty()); Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( - "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); method.setAccessible(true); - method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00")); + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00"), visitedCodes); verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any()); } @@ -305,10 +315,12 @@ public void testDoBackfillDependentWorkflow_SkipOfflineWorkflow() throws Excepti when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.of(offlineDownstream)); Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( - "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class); + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); method.setAccessible(true); - method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00")); + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00"), visitedCodes); verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any()); }