diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobProcessUpdateListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobProcessUpdateListener.java index 59e9780f73..d041d22991 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobProcessUpdateListener.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobProcessUpdateListener.java @@ -17,6 +17,7 @@ package com.oceanbase.odc.service.task.listener; import java.util.Collections; +import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -26,6 +27,7 @@ import com.oceanbase.odc.metadb.task.JobEntity; import com.oceanbase.odc.service.schedule.ScheduleTaskService; import com.oceanbase.odc.service.task.executor.TaskResult; +import com.oceanbase.odc.service.task.processor.result.ResultProcessor; import com.oceanbase.odc.service.task.schedule.JobIdentity; import com.oceanbase.odc.service.task.service.TaskFrameworkService; @@ -44,12 +46,16 @@ public class DefaultJobProcessUpdateListener extends AbstractEventListener resultProcessors; @Override public void onEvent(DefaultJobProcessUpdateEvent event) { TaskResult taskResult = event.getTaskResult(); JobIdentity identity = taskResult.getJobIdentity(); JobEntity jobEntity = stdTaskFrameworkService.find(identity.getId()); + // Call the job result processor when the result is updated + handleTaskResult(jobEntity.getJobType(), event.getTaskResult()); scheduleTaskService.findByJobId(jobEntity.getId()) .ifPresent(taskEntity -> { if (taskEntity.getStatus() == TaskStatus.PREPARING) { @@ -69,4 +75,12 @@ private void updateScheduleTaskStatus(Long id, TaskStatus status, TaskStatus pre } } + private void handleTaskResult(String jobType, TaskResult taskResult) { + for (ResultProcessor processor : resultProcessors) { + if (processor.interested(jobType)) { + processor.process(taskResult); + } + } + } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java index d4921b8c06..fcf56a5ece 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java @@ -29,6 +29,9 @@ import com.oceanbase.odc.core.alarm.AlarmUtils; import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.metadb.task.JobEntity; +import com.oceanbase.odc.service.notification.Broker; +import com.oceanbase.odc.service.notification.NotificationProperties; +import com.oceanbase.odc.service.notification.helper.EventBuilder; import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.schedule.ScheduleTaskService; import com.oceanbase.odc.service.schedule.alarm.ScheduleAlarmUtils; @@ -58,6 +61,12 @@ public class DefaultJobTerminateListener extends AbstractEventListener terminateProcessors; + @Autowired + private NotificationProperties notificationProperties; + @Autowired + private Broker broker; + @Autowired + private EventBuilder eventBuilder; @Override public void onEvent(JobTerminateEvent event) { @@ -80,6 +89,7 @@ public void onEvent(JobTerminateEvent event) { if (event.getStatus() == JobStatus.EXEC_TIMEOUT) { ScheduleAlarmUtils.timeout(scheduleTask.getId()); } + notify(scheduleTask); // invoke task related processor doProcessor(jobEntity, scheduleTask); }); @@ -105,4 +115,16 @@ private void doProcessor(JobEntity jobEntity, ScheduleTask scheduleTask) { } } } + + private void notify(ScheduleTask task) { + if (!notificationProperties.isEnabled()) { + return; + } + try { + broker.enqueueEvent(task.getStatus() == TaskStatus.DONE ? eventBuilder.ofSucceededTask(task) + : eventBuilder.ofFailedTask(task)); + } catch (Exception e) { + log.warn("Failed to enqueue event.", e); + } + } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateNotifyListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateNotifyListener.java index be4126cca9..2058e182e0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateNotifyListener.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateNotifyListener.java @@ -38,6 +38,7 @@ */ @Slf4j @Component("jobTerminateNotifyListener") +@Deprecated public class JobTerminateNotifyListener extends AbstractEventListener { @Autowired diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java index dc19b0ff0c..2b9be93e09 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -52,13 +53,14 @@ public class DLMTerminateProcessor extends DLMProcessorMatcher implements Termin public TaskStatus correctTaskStatus(ScheduleTask scheduleTask, TaskStatus currentStatus) { // correct sub task status - List dlmTableUnits = dlmService.findByScheduleTaskId(scheduleTask.getId()); - dlmTableUnits.forEach(dlmTableUnit -> { - if (!dlmTableUnit.getStatus().isTerminated()) { - dlmTableUnit.setStatus(TaskStatus.CANCELED); - } - }); - dlmService.createOrUpdateDlmTableUnits(dlmTableUnits); + List dlmTableUnits = dlmService.findByScheduleTaskId(scheduleTask.getId()).stream() + .filter(o -> !o.getStatus().isTerminated()).peek(o -> o.setStatus(TaskStatus.CANCELED)).collect( + Collectors.toList()); + if (!dlmTableUnits.isEmpty()) { + log.info("The DLM job has finished, but {} tables could not be completed and have been marked as CANCELED.", + dlmTableUnits.size()); + dlmService.createOrUpdateDlmTableUnits(dlmTableUnits); + } return dlmService.getFinalTaskStatus(scheduleTask.getId()); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java index 917c6d6ef5..3003fdab08 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java @@ -80,7 +80,6 @@ import com.oceanbase.odc.service.task.executor.TaskResult; import com.oceanbase.odc.service.task.listener.DefaultJobProcessUpdateEvent; import com.oceanbase.odc.service.task.listener.JobTerminateEvent; -import com.oceanbase.odc.service.task.processor.result.ResultProcessor; import com.oceanbase.odc.service.task.resource.AbstractK8sResourceOperatorBuilder; import com.oceanbase.odc.service.task.schedule.JobDefinition; import com.oceanbase.odc.service.task.schedule.JobIdentity; @@ -138,8 +137,6 @@ public class StdTaskFrameworkService implements TaskFrameworkService { private ExecutorEndpointManager executorEndpointManager; // default impl private JobStatusFsm jobStatusFsm = new JobStatusFsm(); - @Autowired - private List resultProcessors; @Override public JobEntity find(Long id) { @@ -351,6 +348,11 @@ public void handleResult(TaskResult taskResult) { log.warn("Job identity is not exists by id {}", taskResult.getJobIdentity().getId()); return; } + TaskResult previous = JsonUtils.fromJson(je.getResultJson(), TaskResult.class); + if (taskResult.isProgressChanged(previous)) { + taskResultPublisherExecutor + .execute(() -> publisher.publishEvent(new DefaultJobProcessUpdateEvent(taskResult))); + } // that's may be a dangerous operation if task report too frequent saveOrUpdateLogMetadata(taskResult, je.getId(), je.getStatus()); if (je.getStatus().isTerminated() || je.getStatus() == JobStatus.CANCELING) { @@ -377,7 +379,7 @@ private void doRefreshResult(Long id) throws JobException { } String executorEndpoint = executorEndpointManager.getExecutorEndpoint(je); - TaskResult result = taskExecutorClient.getResult(executorEndpoint, JobIdentity.of(id)); + TaskResult result = getTaskResult(executorEndpoint, je); if (result.getStatus() == TaskStatus.PREPARING) { log.info("Job is preparing, ignore refresh, jobId={}, currentStatus={}", id, result.getStatus()); return; @@ -394,7 +396,6 @@ private void doRefreshResult(Long id) throws JobException { return; } log.info("Progress changed, will update result, jobId={}, currentProgress={}", id, result.getProgress()); - handleTaskResult(je.getJobType(), result); saveOrUpdateLogMetadata(result, je.getId(), je.getStatus()); if (result.getStatus().isTerminated() && MapUtils.isEmpty(result.getLogMetadata())) { @@ -412,8 +413,6 @@ protected void handleTaskResultInner(JobEntity jobEntity, TaskResult result) { log.warn("Update task result failed, the job may finished or deleted already, jobId={}", jobEntity.getId()); return; } - taskResultPublisherExecutor - .execute(() -> publisher.publishEvent(new DefaultJobProcessUpdateEvent(result))); if (publisher != null && result.getStatus() != null && result.getStatus().isTerminated()) { taskResultPublisherExecutor.execute(() -> publisher @@ -460,7 +459,7 @@ public boolean refreshLogMetaForCancelJob(Long id) { } try { String executorEndpoint = executorEndpointManager.getExecutorEndpoint(je); - TaskResult result = taskExecutorClient.getResult(executorEndpoint, JobIdentity.of(id)); + TaskResult result = getTaskResult(executorEndpoint, je); if (je.getRunMode().isK8s() && MapUtils.isEmpty(result.getLogMetadata())) { log.info("Refresh log failed due to log have not uploaded, jobId={}, currentStatus={}", je.getId(), @@ -471,7 +470,6 @@ public boolean refreshLogMetaForCancelJob(Long id) { // TODO(tianke): move this logic to event listener jobRepository.updateResultJson(JsonUtils.toJson(result), result.getJobIdentity().getId()); saveOrUpdateLogMetadata(result, je.getId(), je.getStatus()); - handleTaskResult(je.getJobType(), result); return true; } catch (Exception exception) { log.warn("Refresh log meta failed,errorMsg={}", exception.getMessage()); @@ -520,7 +518,6 @@ private int updateExecutorEndpoint(Long id, String executorEndpoint, JobEntity c private int updateTaskResult(TaskResult taskResult, JobEntity currentJob, JobStatus expectedStatus) { JobEntity jse = new JobEntity(); - handleTaskResult(currentJob.getJobType(), taskResult); jse.setResultJson(JsonUtils.toJson(taskResult)); jse.setStatus(expectedStatus); jse.setProgressPercentage(taskResult.getProgress()); @@ -695,11 +692,15 @@ public Map getJobAttributes(Long jobId) { JobAttributeEntity::getAttributeValue)); } - private void handleTaskResult(String jobType, TaskResult taskResult) { - for (ResultProcessor processor : resultProcessors) { - if (processor.interested(jobType)) { - processor.process(taskResult); - } + // get task result from task pod when k8s mode and call task result processor if result is updated + private TaskResult getTaskResult(String executorEndpoint, JobEntity je) throws JobException { + TaskResult result = taskExecutorClient.getResult(executorEndpoint, JobIdentity.of(je.getId())); + TaskResult previous = JsonUtils.fromJson(je.getResultJson(), TaskResult.class); + if (result.isProgressChanged(previous)) { + taskResultPublisherExecutor + .execute(() -> publisher.publishEvent(new DefaultJobProcessUpdateEvent(result))); } + return result; } + }