Skip to content

Commit d5e112a

Browse files
Add interop between Guava's ListenableFuture and Parseq Task (#306)
Co-authored-by: Karthik Ramgopal <kramgopa@linkedin.com>
1 parent 9aaef28 commit d5e112a

File tree

7 files changed

+253
-2
lines changed

7 files changed

+253
-2
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
v5.1.4
2+
------
3+
* Add interop between Guava's ListenableFuture and Parseq Task
4+
15
v5.1.3
26
------
37
* Fix for multiple javadoc warnings coming from Task.java

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version=5.1.3
1+
version=5.1.4
22
group=com.linkedin.parseq
33
org.gradle.parallel=true

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ def modules = [ /* the name of the modules to use */
77
'parseq-benchmark',
88
'parseq-examples',
99
'parseq-exec',
10+
'parseq-guava-interop',
1011
'parseq-http-client',
1112
'parseq-lambda-names', // shadow jar
1213
'parseq-legacy-examples',
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
ext {
2+
description = """Interop with Guava's ListenableFuture"""
3+
}
4+
5+
dependencies {
6+
compile project(":parseq")
7+
compile "com.google.guava:guava:30.1.1-jre"
8+
9+
testCompile "org.testng:testng:6.9.9"
10+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package com.linkedin.parseq.guava;
2+
3+
import com.google.common.annotations.VisibleForTesting;
4+
import com.google.common.util.concurrent.AbstractFuture;
5+
import com.google.common.util.concurrent.ListenableFuture;
6+
import com.google.common.util.concurrent.MoreExecutors;
7+
import com.linkedin.parseq.BaseTask;
8+
import com.linkedin.parseq.Context;
9+
import com.linkedin.parseq.Task;
10+
import com.linkedin.parseq.promise.Promise;
11+
import com.linkedin.parseq.promise.SettablePromise;
12+
import java.util.concurrent.CancellationException;
13+
import java.util.concurrent.ExecutionException;
14+
15+
16+
/**
17+
* Utility methods to convert between Parseq {@link Task} and Guava's {@link ListenableFuture}.
18+
*/
19+
public class ListenableFutureUtil {
20+
21+
private ListenableFutureUtil() {
22+
// Prevent instantiation.
23+
}
24+
25+
public static <T> Task<T> fromListenableFuture(ListenableFuture<T> future) {
26+
// Setup cancellation propagation from Task -> ListenableFuture.
27+
final SettableTask<T> task =
28+
new SettableTask<T>("fromListenableFuture: " + Task._taskDescriptor.getDescription(future.getClass().getName())) {
29+
@Override
30+
public boolean cancel(Exception rootReason) {
31+
if (future.isCancelled()) {
32+
return super.cancel(rootReason);
33+
}
34+
35+
return super.cancel(rootReason) && future.cancel(true);
36+
}
37+
};
38+
39+
final SettablePromise<T> promise = task.getSettableDelegate();
40+
41+
// Setup forward event propagation ListenableFuture -> Task.
42+
Runnable callbackRunnable = () -> {
43+
try {
44+
final T value = future.get();
45+
promise.done(value);
46+
} catch (CancellationException ex) {
47+
task.cancel(ex);
48+
} catch (ExecutionException ex) {
49+
promise.fail(ex.getCause());
50+
} catch (Exception | Error ex) {
51+
promise.fail(ex);
52+
}
53+
};
54+
future.addListener(callbackRunnable, MoreExecutors.directExecutor());
55+
56+
return task;
57+
}
58+
59+
public static <T> ListenableFuture<T> toListenableFuture(Task<T> task) {
60+
// Setup cancellation propagation from ListenableFuture -> Task.
61+
SettableFuture<T> listenableFuture = new SettableFuture<T>() {
62+
@Override
63+
public boolean cancel(boolean mayInterruptIfRunning) {
64+
return super.cancel(mayInterruptIfRunning) && task.cancel(new CancellationException());
65+
}
66+
67+
@Override
68+
public boolean setException(Throwable ex) {
69+
if (!task.isDone() && ex instanceof CancellationException) {
70+
task.cancel((CancellationException) ex);
71+
}
72+
return super.setException(ex);
73+
}
74+
};
75+
76+
// Setup forward event propagation Task -> ListenableFuture.
77+
task.addListener(promise -> {
78+
if (!promise.isFailed()) {
79+
listenableFuture.set(promise.get());
80+
}
81+
else {
82+
if (promise.getError() instanceof com.linkedin.parseq.CancellationException) {
83+
listenableFuture.cancel(true);
84+
} else {
85+
listenableFuture.setException(promise.getError());
86+
}
87+
}
88+
});
89+
90+
return listenableFuture;
91+
}
92+
93+
/**
94+
* A private helper class to assist toListenableFuture(), by overriding some methods to make them public.
95+
*
96+
* @param <T> The Settable future's type.
97+
*/
98+
@VisibleForTesting
99+
static class SettableFuture<T> extends AbstractFuture<T> {
100+
@Override
101+
public boolean set(T value) {
102+
return super.set(value);
103+
}
104+
105+
@Override
106+
public boolean setException(Throwable throwable) {
107+
return super.setException(throwable);
108+
}
109+
}
110+
111+
/**
112+
* A private helper class to assist fromListenableFuture(), by overriding some methods to make them public.
113+
*
114+
* @param <T> The Settable task's type.
115+
*/
116+
@VisibleForTesting
117+
static class SettableTask<T> extends BaseTask<T> {
118+
119+
public SettableTask(String name) {
120+
super(name);
121+
}
122+
123+
@Override
124+
protected Promise<? extends T> run(Context context) throws Throwable {
125+
return getDelegate();
126+
}
127+
128+
@Override
129+
public SettablePromise<T> getSettableDelegate() {
130+
return super.getSettableDelegate();
131+
}
132+
}
133+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package com.linkedin.parseq.guava;
2+
3+
import com.google.common.util.concurrent.ListenableFuture;
4+
import com.linkedin.parseq.Task;
5+
import java.util.concurrent.CancellationException;
6+
import java.util.concurrent.ExecutionException;
7+
import org.testng.Assert;
8+
import org.testng.annotations.Test;
9+
10+
11+
/**
12+
* Unit tests for {@link ListenableFutureUtil}
13+
*/
14+
public class ListenableFutureUtilTest {
15+
16+
@Test
17+
public void testFromListenableFuture() {
18+
ListenableFutureUtil.SettableFuture<String> listenableFuture = new ListenableFutureUtil.SettableFuture<>();
19+
Task<String> task = ListenableFutureUtil.fromListenableFuture(listenableFuture);
20+
21+
// Test cancel propagation from Task to ListenableFuture
22+
task.cancel(new RuntimeException());
23+
Assert.assertTrue(listenableFuture.isCancelled());
24+
25+
listenableFuture = new ListenableFutureUtil.SettableFuture<>();
26+
task = ListenableFutureUtil.fromListenableFuture(listenableFuture);
27+
28+
// Test successful completion of ListenableFuture.
29+
listenableFuture.set("COMPLETED");
30+
Assert.assertTrue(task.isDone());
31+
Assert.assertFalse(task.isFailed());
32+
Assert.assertEquals(task.get(), "COMPLETED");
33+
34+
listenableFuture = new ListenableFutureUtil.SettableFuture<>();
35+
task = ListenableFutureUtil.fromListenableFuture(listenableFuture);
36+
37+
// Test exceptional completion of ListenableFuture.
38+
listenableFuture.setException(new RuntimeException("Test"));
39+
Assert.assertTrue(task.isDone());
40+
Assert.assertTrue(task.isFailed());
41+
Assert.assertEquals(task.getError().getClass(), RuntimeException.class);
42+
Assert.assertEquals(task.getError().getMessage(), "Test");
43+
44+
listenableFuture = new ListenableFutureUtil.SettableFuture<>();
45+
task = ListenableFutureUtil.fromListenableFuture(listenableFuture);
46+
47+
// Test cancellation of ListenableFuture.
48+
listenableFuture.cancel(true);
49+
Assert.assertTrue(task.isDone());
50+
Assert.assertTrue(task.isFailed());
51+
Assert.assertEquals(task.getError().getCause().getClass(), CancellationException.class);
52+
}
53+
54+
@Test
55+
public void testToListenableFuture() throws Exception {
56+
ListenableFutureUtil.SettableTask<String> task = new ListenableFutureUtil.SettableTask<>("test");
57+
ListenableFuture<String> future = ListenableFutureUtil.toListenableFuture(task);
58+
59+
// Test cancel propagation from ListenableFuture to task
60+
future.cancel(true);
61+
Assert.assertTrue(task.isDone());
62+
Assert.assertTrue(task.isFailed());
63+
Assert.assertEquals(task.getError().getCause().getClass(), CancellationException.class);
64+
65+
task = new ListenableFutureUtil.SettableTask<>("test");
66+
future = ListenableFutureUtil.toListenableFuture(task);
67+
68+
// Test successful completion of task.
69+
task.getSettableDelegate().done("COMPLETED");
70+
Assert.assertTrue(future.isDone());
71+
Assert.assertEquals(future.get(), "COMPLETED");
72+
73+
task = new ListenableFutureUtil.SettableTask<>("test");
74+
future = ListenableFutureUtil.toListenableFuture(task);
75+
76+
// Test exceptional completion of task.
77+
task.getSettableDelegate().fail(new RuntimeException("Test"));
78+
Assert.assertTrue(future.isDone());
79+
try {
80+
future.get();
81+
Assert.fail("ExecutionException not thrown");
82+
} catch (ExecutionException e) {
83+
Assert.assertEquals(e.getCause().getClass(), RuntimeException.class);
84+
Assert.assertEquals(e.getCause().getMessage(), "Test");
85+
}
86+
87+
task = new ListenableFutureUtil.SettableTask<>("test");
88+
future = ListenableFutureUtil.toListenableFuture(task);
89+
90+
// Test cancellation of task.
91+
task.cancel(new RuntimeException("Cancelled"));
92+
Assert.assertTrue(future.isDone());
93+
Assert.assertTrue(future.isCancelled());
94+
try {
95+
future.get();
96+
Assert.fail("Cancellation Exception not thrown");
97+
} catch (CancellationException e) {
98+
// Ignored since we expected a cancellation exception!
99+
} catch (Throwable e) {
100+
Assert.fail("Unexpected Exception thrown", e);
101+
}
102+
}
103+
}

subprojects/parseq/src/main/java/com/linkedin/parseq/CancellationException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.linkedin.parseq;
22

3-
class CancellationException extends Exception {
3+
public class CancellationException extends Exception {
44

55
private static final long serialVersionUID = 1L;
66

0 commit comments

Comments
 (0)