From ad9d1f150d38ebc032e8ce390bbafc3f3312fbbb Mon Sep 17 00:00:00 2001 From: Jason Chenyang Ji Date: Mon, 24 Mar 2025 14:49:06 -0700 Subject: [PATCH] POC for in flight search requests in tasks API --- gradle/run.gradle | 2 +- .../tasks/get/TransportGetTaskAction.java | 1 + .../tasks/list/TransportListTasksAction.java | 4 +- .../action/search/SearchRequest.java | 11 ++- .../opensearch/action/search/SearchTask.java | 27 ++++++- .../main/java/org/opensearch/tasks/Task.java | 22 ++++- .../java/org/opensearch/tasks/TaskInfo.java | 80 ++++++++++++++++--- .../org/opensearch/tasks/TaskManager.java | 9 +++ 8 files changed, 136 insertions(+), 20 deletions(-) diff --git a/gradle/run.gradle b/gradle/run.gradle index 34651f1d94964..0b1385ab04696 100644 --- a/gradle/run.gradle +++ b/gradle/run.gradle @@ -31,7 +31,7 @@ import org.opensearch.gradle.testclusters.RunTask apply plugin: 'opensearch.testclusters' -def numNodes = findProperty('numNodes') as Integer ?: 1 +def numNodes = findProperty('numNodes') as Integer ?: 2 def numZones = findProperty('numZones') as Integer ?: 1 testClusters { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 6543310db5870..2ff64f3f02dc1 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -179,6 +179,7 @@ public void onFailure(Exception e) { }); } else { taskResourceTrackingService.refreshResourceStats(runningTask); + // POC: Always use detailed=true to include the search source TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); listener.onResponse(new GetTaskResponse(new TaskResult(false, info))); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java index 1c543e60c46e0..5b74011bd71ee 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java @@ -100,7 +100,9 @@ protected ListTasksResponse newResponse( @Override protected void taskOperation(ListTasksRequest request, Task task, ActionListener listener) { - listener.onResponse(task.taskInfo(clusterService.localNode().getId(), request.getDetailed())); + // Always include detailed information if this is a search task to get the query source + boolean detailed = request.getDetailed() || task instanceof org.opensearch.action.search.SearchTask; + listener.onResponse(task.taskInfo(clusterService.localNode().getId(), detailed)); } @Override diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 4a4a309b45a2e..0475c18197544 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -715,7 +715,16 @@ public String pipeline() { @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval); + return new SearchTask( + id, + type, + action, + this::buildDescription, + parentTaskId, + headers, + cancelAfterTimeInterval, + source + ); } public final String buildDescription() { diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index 2a1a961e7607b..e0c82b1cf5bff 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -35,6 +35,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.tasks.TaskId; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.SearchBackpressureTask; import org.opensearch.wlm.QueryGroupTask; @@ -53,6 +54,7 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask // generating description in a lazy way since source can be quite big private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; + private final SearchSourceBuilder sourceBuilder; public SearchTask( long id, @@ -62,7 +64,7 @@ public SearchTask( TaskId parentTaskId, Map headers ) { - this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT); + this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, null); } public SearchTask( @@ -76,6 +78,22 @@ public SearchTask( ) { super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval); this.descriptionSupplier = descriptionSupplier; + this.sourceBuilder = null; + } + + public SearchTask( + long id, + String type, + String action, + Supplier descriptionSupplier, + TaskId parentTaskId, + Map headers, + TimeValue cancelAfterTimeInterval, + SearchSourceBuilder sourceBuilder + ) { + super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval); + this.descriptionSupplier = descriptionSupplier; + this.sourceBuilder = sourceBuilder; } @Override @@ -106,4 +124,11 @@ public final SearchProgressListener getProgressListener() { public boolean shouldCancelChildrenOnCancellation() { return true; } + + /** + * Returns the search source builder associated with this task, if any. + */ + public SearchSourceBuilder getSourceBuilder() { + return sourceBuilder; + } } diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index 0fa65bc16516f..01ad574eded94 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -33,6 +33,7 @@ package org.opensearch.tasks; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.action.NotifyOnceListener; @@ -178,9 +179,16 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS String description = null; Task.Status status = null; TaskResourceStats resourceStats = null; + Object searchSource = null; if (detailed) { description = getDescription(); status = getStatus(); + if (this instanceof SearchTask) { + SearchTask searchTask = (SearchTask) this; + if (searchTask.getSourceBuilder() != null) { + searchSource = searchTask.getSourceBuilder(); + } + } } if (excludeStats == false) { resourceStats = new TaskResourceStats(new HashMap<>() { @@ -192,20 +200,27 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS } }, getThreadUsage()); } - return taskInfo(localNodeId, description, status, resourceStats); + return taskInfo(localNodeId, description, status, resourceStats, searchSource); } /** * Build a {@link TaskInfo} for this task without resource stats. */ protected final TaskInfo taskInfo(String localNodeId, String description, Status status) { - return taskInfo(localNodeId, description, status, null); + return taskInfo(localNodeId, description, status, null, null); } /** * Build a proper {@link TaskInfo} for this task. */ protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) { + return taskInfo(localNodeId, description, status, resourceStats, null); + } + + /** + * Build a proper {@link TaskInfo} for this task. + */ + protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats, Object searchSource) { boolean cancelled = this instanceof CancellableTask && ((CancellableTask) this).isCancelled(); Long cancellationStartTime = null; if (cancelled) { @@ -224,7 +239,8 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status parentTask, headers, resourceStats, - cancellationStartTime + cancellationStartTime, + searchSource ); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskInfo.java b/server/src/main/java/org/opensearch/tasks/TaskInfo.java index 3a04e8e4072b2..9bcd45e8ff40b 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/opensearch/tasks/TaskInfo.java @@ -97,6 +97,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { private final TaskResourceStats resourceStats; + private final Object searchSource; + public TaskInfo( TaskId taskId, String type, @@ -124,6 +126,7 @@ public TaskInfo( parentTaskId, headers, resourceStats, + null, null ); } @@ -142,6 +145,40 @@ public TaskInfo( Map headers, TaskResourceStats resourceStats, Long cancellationStartTime + ) { + this( + taskId, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers, + resourceStats, + cancellationStartTime, + null + ); + } + + public TaskInfo( + TaskId taskId, + String type, + String action, + String description, + Task.Status status, + long startTime, + long runningTimeNanos, + boolean cancellable, + boolean cancelled, + TaskId parentTaskId, + Map headers, + TaskResourceStats resourceStats, + Long cancellationStartTime, + Object searchSource ) { if (cancellable == false && cancelled == true) { throw new IllegalArgumentException("task cannot be cancelled"); @@ -159,6 +196,7 @@ public TaskInfo( this.headers = headers; this.resourceStats = resourceStats; this.cancellationStartTime = cancellationStartTime; + this.searchSource = searchSource; } /** @@ -194,6 +232,8 @@ public TaskInfo(StreamInput in) throws IOException { } else { cancellationStartTime = null; } + // For now, searchSource is not serialized over the wire + searchSource = null; } @Override @@ -300,6 +340,13 @@ public TaskResourceStats getResourceStats() { return resourceStats; } + /** + * Returns the search source for this task if this is a search task + */ + public Object getSearchSource() { + return searchSource; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("node", taskId.getNodeId()); @@ -335,6 +382,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (cancellationStartTime != null) { builder.humanReadableField("cancellation_time_millis", "cancellation_time", new TimeValue(cancellationStartTime)); } + // if (searchSource != null && params.paramAsBoolean("detailed", false)) { + if (searchSource != null) { + builder.field("search_source", searchSource); + } return builder; } @@ -367,18 +418,19 @@ public static TaskInfo fromXContent(XContentParser parser) { TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString); return new TaskInfo( id, - type, - action, - description, - status, - startTime, - runningTimeNanos, - cancellable, - cancelled, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + cancelled, parentTaskId, - headers, - resourceStats, - cancellationStartTime + headers, + resourceStats, + cancellationStartTime, + null // searchSource not parsed from XContent yet ); }); static { @@ -424,7 +476,8 @@ public boolean equals(Object obj) { && Objects.equals(status, other.status) && Objects.equals(headers, other.headers) && Objects.equals(resourceStats, other.resourceStats) - && Objects.equals(cancellationStartTime, other.cancellationStartTime); + && Objects.equals(cancellationStartTime, other.cancellationStartTime) + && Objects.equals(searchSource, other.searchSource); } @Override @@ -442,7 +495,8 @@ public int hashCode() { status, headers, resourceStats, - cancellationStartTime + cancellationStartTime, + searchSource ); } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index 6ad06da9d2fa2..dbce47cc332ad 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -38,6 +38,8 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.node.DiscoveryNode; @@ -320,6 +322,13 @@ public Task unregister(Task task) { } } + try { + if (task instanceof SearchTask || task instanceof SearchShardTask) { + Thread.sleep(100000); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } if (task instanceof CancellableTask) { CancellableTaskHolder holder = cancellableTasks.remove(task.getId()); if (holder != null) {