Skip to content

Commit 0d269a1

Browse files
committed
4.x: More Streamable operators
1 parent d9d985c commit 0d269a1

7 files changed

Lines changed: 346 additions & 85 deletions

File tree

src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import io.reactivex.rxjava4.annotations.NonNull;
2323
import io.reactivex.rxjava4.disposables.*;
24+
import io.reactivex.rxjava4.internal.util.AwaitCoordinatorStatic;
2425
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
2526

2627
/**
@@ -84,7 +85,7 @@ public CompletionStageDisposable(@NonNull CompletionStage<T> stage, @NonNull Dis
8485
*/
8586
public void await() {
8687
state.lazySet(true);;
87-
Streamer.await(stage);
88+
AwaitCoordinatorStatic.await(stage);
8889
}
8990

9091
/**
@@ -93,7 +94,7 @@ public void await() {
9394
*/
9495
public void await(DisposableContainer canceller) {
9596
state.lazySet(true);;
96-
Streamer.await(stage, canceller);
97+
AwaitCoordinatorStatic.await(stage, canceller);
9798
}
9899

99100
/**

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
package io.reactivex.rxjava4.core;
1515

1616
import java.lang.reflect.InvocationTargetException;
17-
import java.util.Objects;
17+
import java.util.*;
1818
import java.util.concurrent.*;
19+
import java.util.concurrent.atomic.AtomicInteger;
1920

2021
import io.reactivex.rxjava4.annotations.*;
2122
import io.reactivex.rxjava4.disposables.*;
2223
import io.reactivex.rxjava4.exceptions.Exceptions;
2324
import io.reactivex.rxjava4.functions.*;
2425
import io.reactivex.rxjava4.internal.operators.streamable.*;
26+
import io.reactivex.rxjava4.internal.util.AwaitCoordinatorStatic;
2527
import io.reactivex.rxjava4.schedulers.Schedulers;
2628
import io.reactivex.rxjava4.subscribers.TestSubscriber;
2729

@@ -170,6 +172,52 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
170172
.toStreamable();
171173
}
172174

175+
/**
176+
* Generates a sequence in order which the stages complete in any form.
177+
* @param <T> the common element type
178+
* @param stages the iterable of stages to be relayed in the order they complete
179+
* @param executor the executor to run the blocking operator
180+
* @return the new Streamable instance
181+
*/
182+
@SuppressWarnings("unchecked")
183+
@NonNull
184+
static <@NonNull T> Streamable<CompletionStage<T>> fromStages(@NonNull Iterable<? extends CompletionStage<? extends T>> stages, ExecutorService executor) {
185+
return create(emitter -> {
186+
var list = new ArrayList<CompletionStage<? extends T>>();
187+
for(var stage : stages) {
188+
list.add(stage);
189+
}
190+
while (list.size() != 0) {
191+
var winner = AwaitCoordinatorStatic.awaitFirstIndex(list, emitter.canceller());
192+
emitter.emit((CompletionStage<T>)list.remove(winner));
193+
}
194+
}, executor);
195+
}
196+
197+
/**
198+
* Emits the elements of each inner sequence produced by the outher sequence.
199+
* @param <T> the common element type
200+
* @param sources the streamable of inner streamables
201+
* @param exec the executorservice where to run the virtual wait
202+
* @return the new Streamable instance.
203+
*/
204+
static <@NonNull T> Streamable<T> concat(Streamable<? extends Streamable<? extends T>> sources, ExecutorService exec) {
205+
return create(emitter -> {
206+
var counter = new AtomicInteger();
207+
try (var mainSource = sources.forEach(item -> {
208+
System.out.println(counter.incrementAndGet());
209+
try (var innerSource = item.forEach(inner -> {
210+
System.out.println("> " + inner);
211+
emitter.emit(inner);
212+
}, emitter.canceller(), exec)) {
213+
innerSource.await(emitter.canceller());
214+
}
215+
}, emitter.canceller(), exec)) {
216+
mainSource.await(emitter.canceller());
217+
};
218+
}, exec);
219+
}
220+
173221
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
174222
// Operators
175223
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

src/main/java/io/reactivex/rxjava4/core/Streamer.java

Lines changed: 4 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
package io.reactivex.rxjava4.core;
1515

1616
import java.util.*;
17-
import java.util.concurrent.*;
18-
import java.util.function.Function;
17+
import java.util.concurrent.CompletionStage;
1918

20-
import io.reactivex.rxjava4.annotations.*;
19+
import io.reactivex.rxjava4.annotations.NonNull;
2120
import io.reactivex.rxjava4.disposables.*;
21+
import io.reactivex.rxjava4.internal.util.AwaitCoordinator;
2222

2323
/**
2424
* A realized stream which can then be consumed asynchronously in steps.
@@ -31,7 +31,7 @@
3131
* TODO proper docs
3232
* @since 4.0.0
3333
*/
34-
public interface Streamer<@NonNull T> extends AutoCloseable {
34+
public interface Streamer<@NonNull T> extends AutoCloseable, AwaitCoordinator {
3535

3636
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
3737
// API
@@ -195,79 +195,4 @@ default void awaitFinish() {
195195
default void awaitFinish(@NonNull DisposableContainer cancellation) {
196196
await(finish(cancellation), cancellation);
197197
}
198-
199-
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
200-
// ASYNC/AWAIT "Language" keyword implementations
201-
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
202-
203-
/**
204-
* The {@code await} keyword for async/await.
205-
* @param <T> the type of the returned value if any.
206-
* @param stage the stage to await virtual-blockingly
207-
* @return the awaited value
208-
*/
209-
@Nullable
210-
static <T> T await(@NonNull CompletionStage<T> stage) {
211-
return await(stage, null);
212-
}
213-
214-
/**
215-
* The cancellable {@code await} keyword for async/await.
216-
* @param <T> the type of the returned value if any.
217-
* @param stage the stage to await virtual-blockingly
218-
* @param canceller the container that can trigger a cancellation on demand
219-
* @return the awaited value
220-
*/
221-
@Nullable
222-
static <T> T await(@NonNull CompletionStage<T> stage, @Nullable DisposableContainer canceller) {
223-
var f = stage.toCompletableFuture();
224-
if (canceller == null) {
225-
return f.join();
226-
}
227-
var d = Disposable.fromFuture(f, true);
228-
try (var _ = canceller.subscribe(d)) {
229-
return f.join();
230-
}
231-
}
232-
233-
/**
234-
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
235-
* @param <U> the return type of the function
236-
* @param function the function to apply
237-
* @param canceller the canceller to use
238-
* @param executor the executor to use
239-
* @return the new stage
240-
*/
241-
static <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
242-
DisposableContainer canceller, Executor executor) {
243-
var loopback = new SerialDisposable();
244-
canceller.add(loopback);
245-
246-
// new Exception().printStackTrace();
247-
248-
var f = CompletableFuture.supplyAsync(() -> {
249-
try {
250-
return function.apply(canceller);
251-
} finally {
252-
canceller.delete(loopback);
253-
}
254-
}, executor);
255-
256-
var d = Disposable.fromFuture(f, true);
257-
loopback.replace(d);
258-
259-
return f;
260-
}
261-
262-
/**
263-
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
264-
* @param <U> the return type of the function
265-
* @param function the function to apply
266-
* @param canceller the canceller to use
267-
* @return the new stage
268-
*/
269-
static <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
270-
DisposableContainer canceller) {
271-
return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor());
272-
}
273198
}

src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.rxjava4.disposables.DisposableContainer;
2323
import io.reactivex.rxjava4.internal.fuseable.HasUpstreamPublisher;
2424
import io.reactivex.rxjava4.internal.subscriptions.SubscriptionHelper;
25-
import io.reactivex.rxjava4.internal.util.ExceptionHelper;
25+
import io.reactivex.rxjava4.internal.util.*;
2626
import io.reactivex.rxjava4.internal.virtual.VirtualResumable;
2727

2828
public record StreamableFromPublisher<T>(@NonNull Publisher<T> source,
@@ -89,7 +89,7 @@ public void onComplete() {
8989
@Override
9090
public @NonNull CompletionStage<Boolean> next(@NonNull DisposableContainer canceller) {
9191
// System.out.println("next()");
92-
return Streamer.runStage(_ -> {
92+
return AwaitCoordinatorStatic.runStage(_ -> {
9393
item.lazySet(null);
9494
// System.out.println("Requesting the next item");
9595
SubscriptionHelper.deferredRequest(upstream, requester, 1);
@@ -143,7 +143,7 @@ public void onComplete() {
143143
@Override
144144
public @NonNull CompletionStage<Void> finish(@NonNull DisposableContainer cancellation) {
145145
// new Exception("StreamableFromPublisher::finish").printStackTrace();
146-
return Streamer.runStage(_ -> {
146+
return AwaitCoordinatorStatic.runStage(_ -> {
147147
SubscriptionHelper.cancel(upstream);
148148
return null;
149149
}, cancellation, executor);
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.util;
15+
16+
import java.util.concurrent.*;
17+
import java.util.function.Function;
18+
19+
import io.reactivex.rxjava4.annotations.*;
20+
import io.reactivex.rxjava4.disposables.DisposableContainer;
21+
22+
/**
23+
* Static methods to coordinate {@link CompletionStage}s for various operators.
24+
*/
25+
public interface AwaitCoordinator {
26+
27+
/**
28+
* The {@code await} keyword for async/await.
29+
* @param <T> the type of the returned value if any.
30+
* @param stage the stage to await virtual-blockingly
31+
* @return the awaited value
32+
*/
33+
@Nullable
34+
default <T> T await(@NonNull CompletionStage<T> stage) {
35+
return AwaitCoordinatorStatic.await(stage, null);
36+
}
37+
38+
/**
39+
* The cancellable {@code await} keyword for async/await.
40+
* @param <T> the type of the returned value if any.
41+
* @param stage the stage to await virtual-blockingly
42+
* @param canceller the container that can trigger a cancellation on demand
43+
* @return the awaited value
44+
*/
45+
@Nullable
46+
default <T> T await(@NonNull CompletionStage<T> stage, @Nullable DisposableContainer canceller) {
47+
return AwaitCoordinatorStatic.await(stage, canceller);
48+
}
49+
50+
/**
51+
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
52+
* @param <U> the return type of the function
53+
* @param function the function to apply
54+
* @param canceller the canceller to use
55+
* @param executor the executor to use
56+
* @return the new stage
57+
*/
58+
default <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
59+
DisposableContainer canceller, Executor executor) {
60+
return AwaitCoordinatorStatic.<U>runStage(function, canceller, executor);
61+
}
62+
63+
/**
64+
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
65+
* @param <U> the return type of the function
66+
* @param function the function to apply
67+
* @param canceller the canceller to use
68+
* @return the new stage
69+
*/
70+
default <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
71+
DisposableContainer canceller) {
72+
return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor());
73+
}
74+
}

0 commit comments

Comments
 (0)