Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
package org.apache.dolphinscheduler.api.executor.workflow;

import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest;
Expand All @@ -37,9 +41,12 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;


import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -57,6 +64,12 @@ public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate<Backf
@Autowired
private ProcessService processService;

@Autowired
private WorkflowLineageService workflowLineageService;

@Autowired
private WorkflowDefinitionDao workflowDefinitionDao;

@Autowired
private RegistryClient registryClient;

Expand Down Expand Up @@ -166,13 +179,100 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
final Set<Long> visitedCodes = new HashSet<>();
visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, visitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}

private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
final List<String> backfillTimeList) {
// todo:
final List<String> backfillTimeList,
final Set<Long> visitedCodes) {
// 1) Query downstream dependent workflows for the current workflow
final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition();
final long upstreamWorkflowCode = upstreamWorkflow.getCode();

List<DependentWorkflowDefinition> downstreamDefinitions =
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);

if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode);
return;
}

// 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream
// backfill
final List<ZonedDateTime> upstreamBackfillDates = backfillTimeList.stream()
.map(DateUtils::stringToZoneDateTime)
.collect(Collectors.toList());

// 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO
for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) {
long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode();

// Prevent self-dependency and circular dependency chains
if (visitedCodes.contains(downstreamCode)) {
log.warn("Skip circular dependent workflow {}", downstreamCode);
continue;
}

WorkflowDefinition downstreamWorkflow =
workflowDefinitionDao.queryByCode(downstreamCode).orElse(null);
if (downstreamWorkflow == null) {
log.warn("Skip dependent workflow {}, definition not found", downstreamCode);
continue;
}

if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode);
continue;
}

// Currently, reuse the same business date list as upstream for downstream backfill;
// later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition
// (taskParams).
BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
boolean allLevelDependent = originalParams.isAllLevelDependent();
ComplementDependentMode downstreamDependentMode =
allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;

BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder()
.runMode(originalParams.getRunMode())
.backfillDateList(upstreamBackfillDates)
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
// Control whether downstream will continue triggering its own dependencies based on
// allLevelDependent flag
.backfillDependentMode(downstreamDependentMode)
.allLevelDependent(allLevelDependent)
.executionOrder(originalParams.getExecutionOrder())
.build();

BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder()
.loginUser(backfillWorkflowDTO.getLoginUser())
.workflowDefinition(downstreamWorkflow)
.startNodes(null)
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
.taskDependType(backfillWorkflowDTO.getTaskDependType())
.execType(backfillWorkflowDTO.getExecType())
.warningType(backfillWorkflowDTO.getWarningType())
.warningGroupId(downstreamWorkflow.getWarningGroupId())
.runMode(dependentParams.getRunMode())
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
.workerGroup(backfillWorkflowDTO.getWorkerGroup())
.tenantCode(backfillWorkflowDTO.getTenantCode())
.environmentCode(backfillWorkflowDTO.getEnvironmentCode())
.startParamList(backfillWorkflowDTO.getStartParamList())
.dryRun(backfillWorkflowDTO.getDryRun())
.backfillParams(dependentParams)
.build();

log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}",
downstreamCode, upstreamWorkflowCode, backfillTimeList);

// 4) Mark as visiting before recursive trigger to detect cycles, then trigger downstream backfill
visitedCodes.add(downstreamCode);
execute(dependentBackfillDTO);
}
}
}
Loading