Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.oceanbase.odc.service.task.listener;

import java.util.Collections;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -26,6 +27,7 @@
import com.oceanbase.odc.metadb.task.JobEntity;
import com.oceanbase.odc.service.schedule.ScheduleTaskService;
import com.oceanbase.odc.service.task.executor.TaskResult;
import com.oceanbase.odc.service.task.processor.result.ResultProcessor;
import com.oceanbase.odc.service.task.schedule.JobIdentity;
import com.oceanbase.odc.service.task.service.TaskFrameworkService;

Expand All @@ -44,12 +46,16 @@ public class DefaultJobProcessUpdateListener extends AbstractEventListener<Defau
private ScheduleTaskService scheduleTaskService;
@Autowired
private TaskFrameworkService stdTaskFrameworkService;
@Autowired
private List<ResultProcessor> resultProcessors;

@Override
public void onEvent(DefaultJobProcessUpdateEvent event) {
TaskResult taskResult = event.getTaskResult();
JobIdentity identity = taskResult.getJobIdentity();
JobEntity jobEntity = stdTaskFrameworkService.find(identity.getId());
// Call the job result processor when the result is updated
handleTaskResult(jobEntity.getJobType(), event.getTaskResult());
scheduleTaskService.findByJobId(jobEntity.getId())
.ifPresent(taskEntity -> {
if (taskEntity.getStatus() == TaskStatus.PREPARING) {
Expand All @@ -69,4 +75,12 @@ private void updateScheduleTaskStatus(Long id, TaskStatus status, TaskStatus pre
}
}

private void handleTaskResult(String jobType, TaskResult taskResult) {
for (ResultProcessor processor : resultProcessors) {
if (processor.interested(jobType)) {
processor.process(taskResult);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.oceanbase.odc.core.alarm.AlarmUtils;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.metadb.task.JobEntity;
import com.oceanbase.odc.service.notification.Broker;
import com.oceanbase.odc.service.notification.NotificationProperties;
import com.oceanbase.odc.service.notification.helper.EventBuilder;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.ScheduleTaskService;
import com.oceanbase.odc.service.schedule.alarm.ScheduleAlarmUtils;
Expand Down Expand Up @@ -58,6 +61,12 @@ public class DefaultJobTerminateListener extends AbstractEventListener<JobTermin
private ScheduleService scheduleService;
@Autowired
private List<TerminateProcessor> terminateProcessors;
@Autowired
private NotificationProperties notificationProperties;
@Autowired
private Broker broker;
@Autowired
private EventBuilder eventBuilder;

@Override
public void onEvent(JobTerminateEvent event) {
Expand All @@ -80,6 +89,7 @@ public void onEvent(JobTerminateEvent event) {
if (event.getStatus() == JobStatus.EXEC_TIMEOUT) {
ScheduleAlarmUtils.timeout(scheduleTask.getId());
}
notify(scheduleTask);
// invoke task related processor
doProcessor(jobEntity, scheduleTask);
});
Expand All @@ -105,4 +115,16 @@ private void doProcessor(JobEntity jobEntity, ScheduleTask scheduleTask) {
}
}
}

private void notify(ScheduleTask task) {
if (!notificationProperties.isEnabled()) {
return;
}
try {
broker.enqueueEvent(task.getStatus() == TaskStatus.DONE ? eventBuilder.ofSucceededTask(task)
: eventBuilder.ofFailedTask(task));
} catch (Exception e) {
log.warn("Failed to enqueue event.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
*/
@Slf4j
@Component("jobTerminateNotifyListener")
@Deprecated
public class JobTerminateNotifyListener extends AbstractEventListener<JobTerminateEvent> {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -52,13 +53,14 @@ public class DLMTerminateProcessor extends DLMProcessorMatcher implements Termin

public TaskStatus correctTaskStatus(ScheduleTask scheduleTask, TaskStatus currentStatus) {
// correct sub task status
List<DlmTableUnit> dlmTableUnits = dlmService.findByScheduleTaskId(scheduleTask.getId());
dlmTableUnits.forEach(dlmTableUnit -> {
if (!dlmTableUnit.getStatus().isTerminated()) {
dlmTableUnit.setStatus(TaskStatus.CANCELED);
}
});
dlmService.createOrUpdateDlmTableUnits(dlmTableUnits);
List<DlmTableUnit> dlmTableUnits = dlmService.findByScheduleTaskId(scheduleTask.getId()).stream()
.filter(o -> !o.getStatus().isTerminated()).peek(o -> o.setStatus(TaskStatus.CANCELED)).collect(
Collectors.toList());
if (!dlmTableUnits.isEmpty()) {
log.info("The DLM job has finished, but {} tables could not be completed and have been marked as CANCELED.",
dlmTableUnits.size());
dlmService.createOrUpdateDlmTableUnits(dlmTableUnits);
}
return dlmService.getFinalTaskStatus(scheduleTask.getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import com.oceanbase.odc.service.task.executor.TaskResult;
import com.oceanbase.odc.service.task.listener.DefaultJobProcessUpdateEvent;
import com.oceanbase.odc.service.task.listener.JobTerminateEvent;
import com.oceanbase.odc.service.task.processor.result.ResultProcessor;
import com.oceanbase.odc.service.task.resource.AbstractK8sResourceOperatorBuilder;
import com.oceanbase.odc.service.task.schedule.JobDefinition;
import com.oceanbase.odc.service.task.schedule.JobIdentity;
Expand Down Expand Up @@ -138,8 +137,6 @@ public class StdTaskFrameworkService implements TaskFrameworkService {
private ExecutorEndpointManager executorEndpointManager;
// default impl
private JobStatusFsm jobStatusFsm = new JobStatusFsm();
@Autowired
private List<ResultProcessor> resultProcessors;

@Override
public JobEntity find(Long id) {
Expand Down Expand Up @@ -351,6 +348,11 @@ public void handleResult(TaskResult taskResult) {
log.warn("Job identity is not exists by id {}", taskResult.getJobIdentity().getId());
return;
}
TaskResult previous = JsonUtils.fromJson(je.getResultJson(), TaskResult.class);
if (taskResult.isProgressChanged(previous)) {
taskResultPublisherExecutor
.execute(() -> publisher.publishEvent(new DefaultJobProcessUpdateEvent(taskResult)));
}
// that's may be a dangerous operation if task report too frequent
saveOrUpdateLogMetadata(taskResult, je.getId(), je.getStatus());
if (je.getStatus().isTerminated() || je.getStatus() == JobStatus.CANCELING) {
Expand All @@ -377,7 +379,7 @@ private void doRefreshResult(Long id) throws JobException {
}

String executorEndpoint = executorEndpointManager.getExecutorEndpoint(je);
TaskResult result = taskExecutorClient.getResult(executorEndpoint, JobIdentity.of(id));
TaskResult result = getTaskResult(executorEndpoint, je);
if (result.getStatus() == TaskStatus.PREPARING) {
log.info("Job is preparing, ignore refresh, jobId={}, currentStatus={}", id, result.getStatus());
return;
Expand All @@ -394,7 +396,6 @@ private void doRefreshResult(Long id) throws JobException {
return;
}
log.info("Progress changed, will update result, jobId={}, currentProgress={}", id, result.getProgress());
handleTaskResult(je.getJobType(), result);
saveOrUpdateLogMetadata(result, je.getId(), je.getStatus());

if (result.getStatus().isTerminated() && MapUtils.isEmpty(result.getLogMetadata())) {
Expand All @@ -412,8 +413,6 @@ protected void handleTaskResultInner(JobEntity jobEntity, TaskResult result) {
log.warn("Update task result failed, the job may finished or deleted already, jobId={}", jobEntity.getId());
return;
}
taskResultPublisherExecutor
.execute(() -> publisher.publishEvent(new DefaultJobProcessUpdateEvent(result)));

if (publisher != null && result.getStatus() != null && result.getStatus().isTerminated()) {
taskResultPublisherExecutor.execute(() -> publisher
Expand Down Expand Up @@ -460,7 +459,7 @@ public boolean refreshLogMetaForCancelJob(Long id) {
}
try {
String executorEndpoint = executorEndpointManager.getExecutorEndpoint(je);
TaskResult result = taskExecutorClient.getResult(executorEndpoint, JobIdentity.of(id));
TaskResult result = getTaskResult(executorEndpoint, je);

if (je.getRunMode().isK8s() && MapUtils.isEmpty(result.getLogMetadata())) {
log.info("Refresh log failed due to log have not uploaded, jobId={}, currentStatus={}", je.getId(),
Expand All @@ -471,7 +470,6 @@ public boolean refreshLogMetaForCancelJob(Long id) {
// TODO(tianke): move this logic to event listener
jobRepository.updateResultJson(JsonUtils.toJson(result), result.getJobIdentity().getId());
saveOrUpdateLogMetadata(result, je.getId(), je.getStatus());
handleTaskResult(je.getJobType(), result);
return true;
} catch (Exception exception) {
log.warn("Refresh log meta failed,errorMsg={}", exception.getMessage());
Expand Down Expand Up @@ -520,7 +518,6 @@ private int updateExecutorEndpoint(Long id, String executorEndpoint, JobEntity c

private int updateTaskResult(TaskResult taskResult, JobEntity currentJob, JobStatus expectedStatus) {
JobEntity jse = new JobEntity();
handleTaskResult(currentJob.getJobType(), taskResult);
jse.setResultJson(JsonUtils.toJson(taskResult));
jse.setStatus(expectedStatus);
jse.setProgressPercentage(taskResult.getProgress());
Expand Down Expand Up @@ -695,11 +692,15 @@ public Map<String, String> getJobAttributes(Long jobId) {
JobAttributeEntity::getAttributeValue));
}

private void handleTaskResult(String jobType, TaskResult taskResult) {
for (ResultProcessor processor : resultProcessors) {
if (processor.interested(jobType)) {
processor.process(taskResult);
}
// get task result from task pod when k8s mode and call task result processor if result is updated
private TaskResult getTaskResult(String executorEndpoint, JobEntity je) throws JobException {
TaskResult result = taskExecutorClient.getResult(executorEndpoint, JobIdentity.of(je.getId()));
TaskResult previous = JsonUtils.fromJson(je.getResultJson(), TaskResult.class);
if (result.isProgressChanged(previous)) {
taskResultPublisherExecutor
.execute(() -> publisher.publishEvent(new DefaultJobProcessUpdateEvent(result)));
}
return result;
}

}
Loading