Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.opensearch.gradle.testclusters.RunTask

apply plugin: 'opensearch.testclusters'

def numNodes = findProperty('numNodes') as Integer ?: 1
def numNodes = findProperty('numNodes') as Integer ?: 2
def numZones = findProperty('numZones') as Integer ?: 1

testClusters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void onFailure(Exception e) {
});
} else {
taskResourceTrackingService.refreshResourceStats(runningTask);
// POC: Always use detailed=true to include the search source
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
listener.onResponse(new GetTaskResponse(new TaskResult(false, info)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ protected ListTasksResponse newResponse(

@Override
protected void taskOperation(ListTasksRequest request, Task task, ActionListener<TaskInfo> listener) {
listener.onResponse(task.taskInfo(clusterService.localNode().getId(), request.getDetailed()));
// Always include detailed information if this is a search task to get the query source
boolean detailed = request.getDetailed() || task instanceof org.opensearch.action.search.SearchTask;
listener.onResponse(task.taskInfo(clusterService.localNode().getId(), detailed));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,16 @@ public String pipeline() {

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
return new SearchTask(
id,
type,
action,
this::buildDescription,
parentTaskId,
headers,
cancelAfterTimeInterval,
source
);
}

public final String buildDescription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.SearchBackpressureTask;
import org.opensearch.wlm.QueryGroupTask;

Expand All @@ -53,6 +54,7 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
private final SearchSourceBuilder sourceBuilder;

public SearchTask(
long id,
Expand All @@ -62,7 +64,7 @@ public SearchTask(
TaskId parentTaskId,
Map<String, String> headers
) {
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT);
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, null);
}

public SearchTask(
Expand All @@ -76,6 +78,22 @@ public SearchTask(
) {
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
this.descriptionSupplier = descriptionSupplier;
this.sourceBuilder = null;
}

public SearchTask(
long id,
String type,
String action,
Supplier<String> descriptionSupplier,
TaskId parentTaskId,
Map<String, String> headers,
TimeValue cancelAfterTimeInterval,
SearchSourceBuilder sourceBuilder
) {
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
this.descriptionSupplier = descriptionSupplier;
this.sourceBuilder = sourceBuilder;
}

@Override
Expand Down Expand Up @@ -106,4 +124,11 @@ public final SearchProgressListener getProgressListener() {
public boolean shouldCancelChildrenOnCancellation() {
return true;
}

/**
* Returns the search source builder associated with this task, if any.
*/
public SearchSourceBuilder getSourceBuilder() {
return sourceBuilder;
}
}
22 changes: 19 additions & 3 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.tasks;

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.SearchTask;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.action.NotifyOnceListener;
Expand Down Expand Up @@ -178,9 +179,16 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
String description = null;
Task.Status status = null;
TaskResourceStats resourceStats = null;
Object searchSource = null;
if (detailed) {
description = getDescription();
status = getStatus();
if (this instanceof SearchTask) {
SearchTask searchTask = (SearchTask) this;
if (searchTask.getSourceBuilder() != null) {
searchSource = searchTask.getSourceBuilder();
}
}
}
if (excludeStats == false) {
resourceStats = new TaskResourceStats(new HashMap<>() {
Expand All @@ -192,20 +200,27 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
}
}, getThreadUsage());
}
return taskInfo(localNodeId, description, status, resourceStats);
return taskInfo(localNodeId, description, status, resourceStats, searchSource);
}

/**
* Build a {@link TaskInfo} for this task without resource stats.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
return taskInfo(localNodeId, description, status, null);
return taskInfo(localNodeId, description, status, null, null);
}

/**
* Build a proper {@link TaskInfo} for this task.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) {
return taskInfo(localNodeId, description, status, resourceStats, null);
}

/**
* Build a proper {@link TaskInfo} for this task.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats, Object searchSource) {
boolean cancelled = this instanceof CancellableTask && ((CancellableTask) this).isCancelled();
Long cancellationStartTime = null;
if (cancelled) {
Expand All @@ -224,7 +239,8 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
parentTask,
headers,
resourceStats,
cancellationStartTime
cancellationStartTime,
searchSource
);
}

Expand Down
80 changes: 67 additions & 13 deletions server/src/main/java/org/opensearch/tasks/TaskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {

private final TaskResourceStats resourceStats;

private final Object searchSource;

public TaskInfo(
TaskId taskId,
String type,
Expand Down Expand Up @@ -124,6 +126,7 @@ public TaskInfo(
parentTaskId,
headers,
resourceStats,
null,
null
);
}
Expand All @@ -142,6 +145,40 @@ public TaskInfo(
Map<String, String> headers,
TaskResourceStats resourceStats,
Long cancellationStartTime
) {
this(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers,
resourceStats,
cancellationStartTime,
null
);
}

public TaskInfo(
TaskId taskId,
String type,
String action,
String description,
Task.Status status,
long startTime,
long runningTimeNanos,
boolean cancellable,
boolean cancelled,
TaskId parentTaskId,
Map<String, String> headers,
TaskResourceStats resourceStats,
Long cancellationStartTime,
Object searchSource
) {
if (cancellable == false && cancelled == true) {
throw new IllegalArgumentException("task cannot be cancelled");
Expand All @@ -159,6 +196,7 @@ public TaskInfo(
this.headers = headers;
this.resourceStats = resourceStats;
this.cancellationStartTime = cancellationStartTime;
this.searchSource = searchSource;
}

/**
Expand Down Expand Up @@ -194,6 +232,8 @@ public TaskInfo(StreamInput in) throws IOException {
} else {
cancellationStartTime = null;
}
// For now, searchSource is not serialized over the wire
searchSource = null;
}

@Override
Expand Down Expand Up @@ -300,6 +340,13 @@ public TaskResourceStats getResourceStats() {
return resourceStats;
}

/**
* Returns the search source for this task if this is a search task
*/
public Object getSearchSource() {
return searchSource;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node", taskId.getNodeId());
Expand Down Expand Up @@ -335,6 +382,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (cancellationStartTime != null) {
builder.humanReadableField("cancellation_time_millis", "cancellation_time", new TimeValue(cancellationStartTime));
}
// if (searchSource != null && params.paramAsBoolean("detailed", false)) {
if (searchSource != null) {
builder.field("search_source", searchSource);
}
return builder;
}

Expand Down Expand Up @@ -367,18 +418,19 @@ public static TaskInfo fromXContent(XContentParser parser) {
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
return new TaskInfo(
id,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers,
resourceStats,
cancellationStartTime
headers,
resourceStats,
cancellationStartTime,
null // searchSource not parsed from XContent yet
);
});
static {
Expand Down Expand Up @@ -424,7 +476,8 @@ public boolean equals(Object obj) {
&& Objects.equals(status, other.status)
&& Objects.equals(headers, other.headers)
&& Objects.equals(resourceStats, other.resourceStats)
&& Objects.equals(cancellationStartTime, other.cancellationStartTime);
&& Objects.equals(cancellationStartTime, other.cancellationStartTime)
&& Objects.equals(searchSource, other.searchSource);
}

@Override
Expand All @@ -442,7 +495,8 @@ public int hashCode() {
status,
headers,
resourceStats,
cancellationStartTime
cancellationStartTime,
searchSource
);
}
}
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchTask;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -320,6 +322,13 @@ public Task unregister(Task task) {
}
}

try {
if (task instanceof SearchTask || task instanceof SearchShardTask) {
Thread.sleep(100000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (task instanceof CancellableTask) {
CancellableTaskHolder holder = cancellableTasks.remove(task.getId());
if (holder != null) {
Expand Down