diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java index 0c684f2876f2..4e2de9c85a38 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java @@ -18,14 +18,18 @@ package org.apache.dolphinscheduler.api.executor.workflow; import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.WorkflowLineageService; import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.ExecutionOrder; +import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.repository.CommandDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; import org.apache.dolphinscheduler.extract.base.client.Clients; import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest; @@ -37,9 +41,12 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -57,6 +64,12 @@ public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate visitedCodes = new HashSet<>(); + visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode()); + doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, visitedCodes); } return backfillTriggerResponse.getWorkflowInstanceId(); } private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO, - final List backfillTimeList) { - // todo: + final List backfillTimeList, + final Set visitedCodes) { + // 1) Query downstream dependent workflows for the current workflow + final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition(); + final long upstreamWorkflowCode = upstreamWorkflow.getCode(); + + List downstreamDefinitions = + workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); + + if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { + log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); + return; + } + + // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream + // backfill + final List upstreamBackfillDates = backfillTimeList.stream() + .map(DateUtils::stringToZoneDateTime) + .collect(Collectors.toList()); + + // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO + for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { + long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); + + // Prevent self-dependency and circular dependency chains + if (visitedCodes.contains(downstreamCode)) { + log.warn("Skip circular dependent workflow {}", downstreamCode); + continue; + } + + WorkflowDefinition downstreamWorkflow = + workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); + if (downstreamWorkflow == null) { + log.warn("Skip dependent workflow {}, definition not found", downstreamCode); + continue; + } + + if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) { + log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode); + continue; + } + + // Currently, reuse the same business date list as upstream for downstream backfill; + // later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition + // (taskParams). + BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams(); + boolean allLevelDependent = originalParams.isAllLevelDependent(); + ComplementDependentMode downstreamDependentMode = + allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE; + + BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(originalParams.getRunMode()) + .backfillDateList(upstreamBackfillDates) + .expectedParallelismNumber(originalParams.getExpectedParallelismNumber()) + // Control whether downstream will continue triggering its own dependencies based on + // allLevelDependent flag + .backfillDependentMode(downstreamDependentMode) + .allLevelDependent(allLevelDependent) + .executionOrder(originalParams.getExecutionOrder()) + .build(); + + BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder() + .loginUser(backfillWorkflowDTO.getLoginUser()) + .workflowDefinition(downstreamWorkflow) + .startNodes(null) + .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) + .taskDependType(backfillWorkflowDTO.getTaskDependType()) + .execType(backfillWorkflowDTO.getExecType()) + .warningType(backfillWorkflowDTO.getWarningType()) + .warningGroupId(downstreamWorkflow.getWarningGroupId()) + .runMode(dependentParams.getRunMode()) + .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) + .workerGroup(backfillWorkflowDTO.getWorkerGroup()) + .tenantCode(backfillWorkflowDTO.getTenantCode()) + .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) + .startParamList(backfillWorkflowDTO.getStartParamList()) + .dryRun(backfillWorkflowDTO.getDryRun()) + .backfillParams(dependentParams) + .build(); + + log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}", + downstreamCode, upstreamWorkflowCode, backfillTimeList); + + // 4) Mark as visiting before recursive trigger to detect cycles, then trigger downstream backfill + visitedCodes.add(downstreamCode); + execute(dependentBackfillDTO); + } } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java new file mode 100644 index 000000000000..2113cd9799ff --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.api.service.WorkflowLineageService; +import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.ExecutionOrder; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; + +import java.lang.reflect.Method; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class BackfillWorkflowExecutorDelegateTest { + + @Spy + @InjectMocks + private BackfillWorkflowExecutorDelegate backfillWorkflowExecutorDelegate; + + @Mock + private WorkflowLineageService workflowLineageService; + + @Mock + private WorkflowDefinitionDao workflowDefinitionDao; + + @Test + public void testDoBackfillDependentWorkflow_NoDownstreamDefinitions() throws Exception { + long upstreamCode = 1L; + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder() + .code(upstreamCode) + .releaseState(ReleaseState.ONLINE) + .build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(true) + .executionOrder(ExecutionOrder.ASC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Collections.emptyList()); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); + method.setAccessible(true); + + List backfillTimeList = Collections.singletonList("2026-02-01 00:00:00"); + + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes); + + verify(workflowDefinitionDao, never()).queryByCode(anyLong()); + } + + @Test + public void testDoBackfillDependentWorkflow_WithDownstream_AllLevelDependent() throws Exception { + long upstreamCode = 10L; + long downstreamCode = 20L; + + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder() + .code(upstreamCode) + .releaseState(ReleaseState.ONLINE) + .build(); + + WorkflowDefinition downstreamWorkflow = + WorkflowDefinition.builder() + .code(downstreamCode) + .releaseState(ReleaseState.ONLINE) + .warningGroupId(100) + .build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .expectedParallelismNumber(2) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(true) + .executionOrder(ExecutionOrder.DESC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + DependentWorkflowDefinition selfDependent = new DependentWorkflowDefinition(); + selfDependent.setWorkflowDefinitionCode(upstreamCode); + + DependentWorkflowDefinition validDependent = new DependentWorkflowDefinition(); + validDependent.setWorkflowDefinitionCode(downstreamCode); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Arrays.asList(selfDependent, validDependent)); + when(workflowDefinitionDao.queryByCode(downstreamCode)) + .thenReturn(Optional.of(downstreamWorkflow)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class); + doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate).execute(captor.capture()); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); + method.setAccessible(true); + + List backfillTimeList = Arrays.asList( + "2026-02-01 00:00:00", + "2026-02-02 00:00:00"); + + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes); + + verify(workflowDefinitionDao).queryByCode(downstreamCode); + + BackfillWorkflowDTO captured = captor.getValue(); + Assertions.assertNotNull(captured); + Assertions.assertEquals(downstreamCode, captured.getWorkflowDefinition().getCode()); + Assertions.assertEquals(downstreamWorkflow.getWarningGroupId(), captured.getWarningGroupId()); + + BackfillWorkflowDTO.BackfillParamsDTO capturedParams = captured.getBackfillParams(); + Assertions.assertNotNull(capturedParams); + Assertions.assertEquals(params.getRunMode(), capturedParams.getRunMode()); + Assertions.assertEquals(params.getExpectedParallelismNumber(), capturedParams.getExpectedParallelismNumber()); + Assertions.assertEquals(params.getExecutionOrder(), capturedParams.getExecutionOrder()); + Assertions.assertEquals(ComplementDependentMode.ALL_DEPENDENT, capturedParams.getBackfillDependentMode()); + Assertions.assertTrue(capturedParams.isAllLevelDependent()); + Assertions.assertEquals(backfillTimeList.size(), capturedParams.getBackfillDateList().size()); + Assertions.assertNull(captured.getStartNodes()); + } + + @Test + public void testDoBackfillDependentWorkflow_WithDownstream_SingleLevelDependent() throws Exception { + long upstreamCode = 100L; + long downstreamCode = 200L; + + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder() + .code(upstreamCode) + .releaseState(ReleaseState.ONLINE) + .build(); + + WorkflowDefinition downstreamWorkflow = + WorkflowDefinition.builder() + .code(downstreamCode) + .releaseState(ReleaseState.ONLINE) + .warningGroupId(200) + .build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .expectedParallelismNumber(3) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(false) + .executionOrder(ExecutionOrder.ASC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + DependentWorkflowDefinition validDependent = new DependentWorkflowDefinition(); + validDependent.setWorkflowDefinitionCode(downstreamCode); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Collections.singletonList(validDependent)); + when(workflowDefinitionDao.queryByCode(downstreamCode)) + .thenReturn(Optional.of(downstreamWorkflow)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class); + doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate).execute(captor.capture()); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); + method.setAccessible(true); + + List backfillTimeList = Collections.singletonList("2026-02-03 00:00:00"); + + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes); + + verify(workflowDefinitionDao).queryByCode(downstreamCode); + + BackfillWorkflowDTO captured = captor.getValue(); + Assertions.assertNotNull(captured); + + BackfillWorkflowDTO.BackfillParamsDTO capturedParams = captured.getBackfillParams(); + Assertions.assertNotNull(capturedParams); + Assertions.assertEquals(ComplementDependentMode.OFF_MODE, capturedParams.getBackfillDependentMode()); + Assertions.assertFalse(capturedParams.isAllLevelDependent()); + Assertions.assertEquals(backfillTimeList.size(), capturedParams.getBackfillDateList().size()); + Assertions.assertNull(captured.getStartNodes()); + } + + @Test + public void testDoBackfillDependentWorkflow_SkipWorkflowNotFound() throws Exception { + long upstreamCode = 1000L; + long downstreamCode = 2000L; + + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(true) + .executionOrder(ExecutionOrder.ASC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + DependentWorkflowDefinition dep = new DependentWorkflowDefinition(); + dep.setWorkflowDefinitionCode(downstreamCode); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Collections.singletonList(dep)); + when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.empty()); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); + method.setAccessible(true); + + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00"), visitedCodes); + + verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any()); + } + + @Test + public void testDoBackfillDependentWorkflow_SkipOfflineWorkflow() throws Exception { + long upstreamCode = 3000L; + long downstreamCode = 4000L; + + WorkflowDefinition upstreamWorkflow = + WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build(); + + WorkflowDefinition offlineDownstream = + WorkflowDefinition.builder().code(downstreamCode).releaseState(ReleaseState.OFFLINE).build(); + + BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() + .runMode(RunMode.RUN_MODE_SERIAL) + .backfillDateList(Collections.emptyList()) + .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) + .allLevelDependent(true) + .executionOrder(ExecutionOrder.ASC_ORDER) + .build(); + + BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() + .workflowDefinition(upstreamWorkflow) + .backfillParams(params) + .build(); + + DependentWorkflowDefinition dep = new DependentWorkflowDefinition(); + dep.setWorkflowDefinitionCode(downstreamCode); + + when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode)) + .thenReturn(Collections.singletonList(dep)); + when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.of(offlineDownstream)); + + Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod( + "doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class); + method.setAccessible(true); + + Set visitedCodes = new HashSet<>(); + visitedCodes.add(dto.getWorkflowDefinition().getCode()); + method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00"), visitedCodes); + + verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any()); + } +}