From 531b389da8424aa2c594e3d2e186221fbb914d08 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 19 Mar 2026 12:50:35 +0100 Subject: [PATCH 1/3] 4.x: Introduce FlatMapConfig & remove some overload bloat Also found a Checkstyle plugin bug: https://github.com/checkstyle/checkstyle/issues/19342 --- config/checkstyle/suppressions.xml | 4 + gradle/wrapper/gradle-wrapper.properties | 2 +- gradlew | 2 +- .../FlowableFlatMapCompletableAsyncPerf.java | 3 +- .../FlowableFlatMapCompletableSyncPerf.java | 3 +- .../rxjava4/parallel/ParallelPerf.java | 3 +- .../io/reactivex/rxjava4/core/Flowable.java | 172 +++--------------- .../rxjava4/core/config/FlatMapConfig.java | 74 ++++++++ .../rxjava4/core/config/GenericConfig.java | 74 ++++++++ .../rxjava4/core/docs/FlowableDocBasic.java | 1 + src/main/module/module-info.java | 2 + .../flowable/FlowableFlatMapTest.java | 40 ++-- .../flowable/FlowableGroupByTest.java | 25 +-- .../operators/flowable/FlowableMergeTest.java | 5 +- .../operators/flowable/FlowableRetryTest.java | 3 +- .../FlowableWindowWithFlowableTest.java | 3 +- .../flowable/FlowableWindowWithTimeTest.java | 3 +- .../ParamValidationCheckerTest.java | 6 +- 18 files changed, 237 insertions(+), 188 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java create mode 100644 src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml index f4675fd64c..0d28559996 100644 --- a/config/checkstyle/suppressions.xml +++ b/config/checkstyle/suppressions.xml @@ -12,4 +12,8 @@ + + + + diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index dbc3ce4a04..c61a118f7d 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.0-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.1-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 0262dcbd52..739907dfd1 100755 --- a/gradlew +++ b/gradlew @@ -57,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/b631911858264c0b6e4d6603d677ff5218766cee/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/2d6327017519d23b96af35865dc997fcb544fb40/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. diff --git a/src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableAsyncPerf.java b/src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableAsyncPerf.java index 548927d771..07efe9ca9c 100644 --- a/src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableAsyncPerf.java +++ b/src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableAsyncPerf.java @@ -19,6 +19,7 @@ import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.functions.Action; import io.reactivex.rxjava4.internal.functions.Functions; import io.reactivex.rxjava4.schedulers.Schedulers; @@ -59,7 +60,7 @@ public void setup() { .flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency); flatMap = Flowable.fromArray(array) - .flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency); + .flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), new FlatMapConfig(false, maxConcurrency)); } // @Benchmark diff --git a/src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableSyncPerf.java b/src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableSyncPerf.java index 4f2460989a..580efd9ad4 100644 --- a/src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableSyncPerf.java +++ b/src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableSyncPerf.java @@ -19,6 +19,7 @@ import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.internal.functions.Functions; @SuppressWarnings("exports") @@ -49,7 +50,7 @@ public void setup() { .flatMapCompletable(Functions.justFunction(Completable.complete()), false, maxConcurrency); flatMap = Flowable.fromArray(array) - .flatMap(Functions.justFunction(Completable.complete().toFlowable()), false, maxConcurrency); + .flatMap(Functions.justFunction(Completable.complete().toFlowable()), new FlatMapConfig(false, maxConcurrency)); } @Benchmark diff --git a/src/jmh/java/io/reactivex/rxjava4/parallel/ParallelPerf.java b/src/jmh/java/io/reactivex/rxjava4/parallel/ParallelPerf.java index 3334aa5247..b4f9f747c7 100644 --- a/src/jmh/java/io/reactivex/rxjava4/parallel/ParallelPerf.java +++ b/src/jmh/java/io/reactivex/rxjava4/parallel/ParallelPerf.java @@ -21,6 +21,7 @@ import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.flowables.GroupedFlowable; import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.schedulers.Schedulers; @@ -71,7 +72,7 @@ public Publisher apply(Integer v) { return Flowable.just(v).subscribeOn(Schedulers.computation()) .map(ParallelPerf.this); } - }, cpu); + }, new FlatMapConfig(cpu)); groupBy = source.groupBy(new Function() { int i; diff --git a/src/main/java/io/reactivex/rxjava4/core/Flowable.java b/src/main/java/io/reactivex/rxjava4/core/Flowable.java index 726dddd5e5..b71cbf7545 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Flowable.java @@ -19,6 +19,7 @@ import java.util.stream.*; import io.reactivex.rxjava4.annotations.*; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.core.docs.FlowableDocBasic; import io.reactivex.rxjava4.disposables.*; import io.reactivex.rxjava4.exceptions.*; @@ -3498,7 +3499,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static <@NonNull T> Flowable merge(@NonNull Iterable<@NonNull ? extends Publisher> sources, int maxConcurrency, int bufferSize) { - return fromIterable(sources).flatMap((Function)Functions.identity(), false, maxConcurrency, bufferSize); + return fromIterable(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(false, maxConcurrency, bufferSize)); } /** @@ -3551,7 +3552,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SafeVarargs @NonNull public static <@NonNull T> Flowable mergeArray(int maxConcurrency, int bufferSize, @NonNull Publisher... sources) { - return fromArray(sources).flatMap((Function)Functions.identity(), false, maxConcurrency, bufferSize); + return fromArray(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(false, maxConcurrency, bufferSize)); } /** @@ -3646,7 +3647,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static <@NonNull T> Flowable merge(@NonNull Iterable<@NonNull ? extends Publisher> sources, int maxConcurrency) { - return fromIterable(sources).flatMap((Function)Functions.identity(), maxConcurrency); + return fromIterable(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(maxConcurrency)); } /** @@ -3744,7 +3745,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static <@NonNull T> Flowable merge(@NonNull Publisher<@NonNull ? extends Publisher> sources, int maxConcurrency) { - return fromPublisher(sources).flatMap((Function)Functions.identity(), maxConcurrency); + return fromPublisher(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(maxConcurrency)); } /** @@ -3790,7 +3791,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SafeVarargs @NonNull public static <@NonNull T> Flowable mergeArray(@NonNull Publisher... sources) { - return fromArray(sources).flatMap((Function)Functions.identity(), sources.length); + return fromArray(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(sources.length)); } /** @@ -3839,7 +3840,7 @@ public static Flowable intervalRange(long start, long count, long initialD public static <@NonNull T> Flowable merge(@NonNull Publisher source1, @NonNull Publisher source2) { Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); - return fromArray(source1, source2).flatMap((Function)Functions.identity(), false, 2); + return fromArray(source1, source2).flatMap((Function)Functions.identity(), new FlatMapConfig(false, 2)); } /** @@ -3891,7 +3892,7 @@ public static Flowable intervalRange(long start, long count, long initialD Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); - return fromArray(source1, source2, source3).flatMap((Function)Functions.identity(), false, 3); + return fromArray(source1, source2, source3).flatMap((Function)Functions.identity(), new FlatMapConfig(false, 3)); } /** @@ -3948,7 +3949,7 @@ public static Flowable intervalRange(long start, long count, long initialD Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); Objects.requireNonNull(source4, "source4 is null"); - return fromArray(source1, source2, source3, source4).flatMap((Function)Functions.identity(), false, 4); + return fromArray(source1, source2, source3, source4).flatMap((Function)Functions.identity(), new FlatMapConfig(false, 4)); } /** @@ -3985,7 +3986,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static <@NonNull T> Flowable mergeDelayError(@NonNull Iterable<@NonNull ? extends Publisher> sources) { - return fromIterable(sources).flatMap((Function)Functions.identity(), true); + return fromIterable(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(true)); } /** @@ -4027,7 +4028,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static <@NonNull T> Flowable mergeDelayError(@NonNull Iterable<@NonNull ? extends Publisher> sources, int maxConcurrency, int bufferSize) { - return fromIterable(sources).flatMap((Function)Functions.identity(), true, maxConcurrency, bufferSize); + return fromIterable(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(true, maxConcurrency, bufferSize)); } /** @@ -4070,7 +4071,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SafeVarargs @NonNull public static <@NonNull T> Flowable mergeArrayDelayError(int maxConcurrency, int bufferSize, @NonNull Publisher... sources) { - return fromArray(sources).flatMap((Function)Functions.identity(), true, maxConcurrency, bufferSize); + return fromArray(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(true, maxConcurrency, bufferSize)); } /** @@ -4110,7 +4111,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static <@NonNull T> Flowable mergeDelayError(@NonNull Iterable<@NonNull ? extends Publisher> sources, int maxConcurrency) { - return fromIterable(sources).flatMap((Function)Functions.identity(), true, maxConcurrency); + return fromIterable(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(true, maxConcurrency)); } /** @@ -4189,7 +4190,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static <@NonNull T> Flowable mergeDelayError(@NonNull Publisher<@NonNull ? extends Publisher> sources, int maxConcurrency) { - return fromPublisher(sources).flatMap((Function)Functions.identity(), true, maxConcurrency); + return fromPublisher(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(true, maxConcurrency)); } /** @@ -4227,7 +4228,7 @@ public static Flowable intervalRange(long start, long count, long initialD @SafeVarargs @NonNull public static <@NonNull T> Flowable mergeArrayDelayError(@NonNull Publisher... sources) { - return fromArray(sources).flatMap((Function)Functions.identity(), true, sources.length); + return fromArray(sources).flatMap((Function)Functions.identity(), new FlatMapConfig(true, sources.length)); } /** @@ -4268,7 +4269,7 @@ public static Flowable intervalRange(long start, long count, long initialD public static <@NonNull T> Flowable mergeDelayError(@NonNull Publisher source1, @NonNull Publisher source2) { Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); - return fromArray(source1, source2).flatMap((Function)Functions.identity(), true, 2); + return fromArray(source1, source2).flatMap((Function)Functions.identity(), new FlatMapConfig(true, 2)); } /** @@ -4313,7 +4314,7 @@ public static Flowable intervalRange(long start, long count, long initialD Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); - return fromArray(source1, source2, source3).flatMap((Function)Functions.identity(), true, 3); + return fromArray(source1, source2, source3).flatMap((Function)Functions.identity(), new FlatMapConfig(true, 3)); } /** @@ -4363,7 +4364,7 @@ public static Flowable intervalRange(long start, long count, long initialD Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); Objects.requireNonNull(source4, "source4 is null"); - return fromArray(source1, source2, source3, source4).flatMap((Function)Functions.identity(), true, 4); + return fromArray(source1, source2, source3, source4).flatMap((Function)Functions.identity(), new FlatMapConfig(true, 4)); } /** @@ -10261,42 +10262,7 @@ public final Single firstOrError() { @SchedulerSupport(SchedulerSupport.NONE) @NonNull public final <@NonNull R> Flowable flatMap(@NonNull Function> mapper) { - return flatMap(mapper, false, bufferSize(), bufferSize()); - } - - /** - * Returns a {@code Flowable} that emits items based on applying a function that you supply to each item emitted - * by the current {@code Flowable}, where that function returns a {@link Publisher}, and then merging those resulting - * {@code Publisher}s and emitting the results of this merger. - *

- * - *

- *
Backpressure:
- *
The operator honors backpressure from downstream. The upstream {@code Flowable} is consumed - * in a bounded manner (up to {@link #bufferSize()} outstanding request amount for items). - * The inner {@code Publisher}s are expected to honor backpressure; if violated, - * the operator may signal {@link MissingBackpressureException}.
- *
Scheduler:
- *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the value type of the inner {@code Publisher}s and the output type - * @param mapper - * a function that, when applied to an item emitted by the current {@code Flowable}, returns a - * {@code Publisher} - * @param delayErrors - * if {@code true}, exceptions from the current {@code Flowable} and all inner {@code Publisher}s are delayed until all of them terminate - * if {@code false}, the first one signaling an exception will terminate the whole sequence immediately - * @return the new {@code Flowable} instance - * @throws NullPointerException if {@code mapper} is {@code null} - * @see ReactiveX operators documentation: FlatMap - */ - @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.NONE) - @NonNull - public final <@NonNull R> Flowable flatMap(@NonNull Function> mapper, boolean delayErrors) { - return flatMap(mapper, delayErrors, bufferSize(), bufferSize()); + return flatMap(mapper, new FlatMapConfig()); } /** @@ -10320,93 +10286,9 @@ public final Single firstOrError() { * @param mapper * a function that, when applied to an item emitted by the current {@code Flowable}, returns a * {@code Publisher} - * @param maxConcurrency - * the maximum number of {@code Publisher}s that may be subscribed to concurrently + * @param config the {@link FlatMapConfig} record to customize behavior * @return the new {@code Flowable} instance - * @throws NullPointerException if {@code mapper} is {@code null} - * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive - * @see ReactiveX operators documentation: FlatMap - * @since 2.0 - */ - @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.NONE) - @NonNull - public final <@NonNull R> Flowable flatMap(@NonNull Function> mapper, int maxConcurrency) { - return flatMap(mapper, false, maxConcurrency, bufferSize()); - } - - /** - * Returns a {@code Flowable} that emits items based on applying a function that you supply to each item emitted - * by the current {@code Flowable}, where that function returns a {@link Publisher}, and then merging those resulting - * {@code Publisher}s and emitting the results of this merger, while limiting the maximum number of concurrent - * subscriptions to these {@code Publisher}s. - * - * - *
- *
Backpressure:
- *
The operator honors backpressure from downstream. The upstream {@code Flowable} is consumed - * in a bounded manner (up to {@code maxConcurrency} outstanding request amount for items). - * The inner {@code Publisher}s are expected to honor backpressure; if violated, - * the operator may signal {@link MissingBackpressureException}.
- *
Scheduler:
- *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the value type of the inner {@code Publisher}s and the output type - * @param mapper - * a function that, when applied to an item emitted by the current {@code Flowable}, returns a - * {@code Publisher} - * @param maxConcurrency - * the maximum number of {@code Publisher}s that may be subscribed to concurrently - * @param delayErrors - * if {@code true}, exceptions from the current {@code Flowable} and all inner {@code Publisher}s are delayed until all of them terminate - * if {@code false}, the first one signaling an exception will terminate the whole sequence immediately - * @return the new {@code Flowable} instance - * @throws NullPointerException if {@code mapper} is {@code null} - * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive - * @see ReactiveX operators documentation: FlatMap - * @since 2.0 - */ - @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.NONE) - @NonNull - public final <@NonNull R> Flowable flatMap(@NonNull Function> mapper, boolean delayErrors, int maxConcurrency) { - return flatMap(mapper, delayErrors, maxConcurrency, bufferSize()); - } - - /** - * Returns a {@code Flowable} that emits items based on applying a function that you supply to each item emitted - * by the current {@code Flowable}, where that function returns a {@link Publisher}, and then merging those resulting - * {@code Publisher}s and emitting the results of this merger, while limiting the maximum number of concurrent - * subscriptions to these {@code Publisher}s. - * - * - *
- *
Backpressure:
- *
The operator honors backpressure from downstream. The upstream {@code Flowable} is consumed - * in a bounded manner (up to {@code maxConcurrency} outstanding request amount for items). - * The inner {@code Publisher}s are expected to honor backpressure; if violated, - * the operator may signal {@link MissingBackpressureException}.
- *
Scheduler:
- *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the value type of the inner {@code Publisher}s and the output type - * @param mapper - * a function that, when applied to an item emitted by the current {@code Flowable}, returns a - * {@code Publisher} - * @param maxConcurrency - * the maximum number of {@code Publisher}s that may be subscribed to concurrently - * @param delayErrors - * if {@code true}, exceptions from the current {@code Flowable} and all inner {@code Publisher}s are delayed until all of them terminate - * if {@code false}, the first one signaling an exception will terminate the whole sequence immediately - * @param bufferSize - * the number of elements to prefetch from each inner {@code Publisher} - * @return the new {@code Flowable} instance - * @throws NullPointerException if {@code mapper} is {@code null} - * @throws IllegalArgumentException if {@code maxConcurrency} or {@code bufferSize} is non-positive + * @throws NullPointerException if {@code mapper} or {@code config} is {@code null} * @see ReactiveX operators documentation: FlatMap * @since 2.0 */ @@ -10414,11 +10296,11 @@ public final Single firstOrError() { @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) - public final <@NonNull R> Flowable flatMap(@NonNull Function> mapper, - boolean delayErrors, int maxConcurrency, int bufferSize) { + public final <@NonNull R> Flowable flatMap( + @NonNull Function> mapper, + @NonNull FlatMapConfig config) { Objects.requireNonNull(mapper, "mapper is null"); - ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + Objects.requireNonNull(config, "config is null"); if (this instanceof ScalarSupplier) { @SuppressWarnings("unchecked") T v = ((ScalarSupplier)this).get(); @@ -10427,7 +10309,7 @@ public final Single firstOrError() { } return FlowableScalarXMap.scalarXMap(v, mapper); } - return RxJavaPlugins.onAssembly(new FlowableFlatMap<>(this, mapper, delayErrors, maxConcurrency, bufferSize)); + return RxJavaPlugins.onAssembly(new FlowableFlatMap<>(this, mapper, config.delayErrors(), config.maxConcurrency(), config.bufferSize())); } /** @@ -10693,7 +10575,7 @@ public final Single firstOrError() { Objects.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return flatMap(FlowableInternalHelper.flatMapWithCombiner(mapper, combiner), delayErrors, maxConcurrency, bufferSize); + return flatMap(FlowableInternalHelper.flatMapWithCombiner(mapper, combiner), new FlatMapConfig(delayErrors, maxConcurrency, bufferSize)); } /** diff --git a/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java b/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java new file mode 100644 index 0000000000..3c2e4dfb0a --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.core.config; + +import io.reactivex.rxjava4.core.Flowable; +import io.reactivex.rxjava4.internal.functions.ObjectHelper; + +/** + * Generic configuration block with option to delay errors, change prefetch + * amounts and buffer sizes. + * TODO once value classes are available, make this a record class. + * @since 4.0.0 + */ +public record FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) { + + /** + * Default config: no error delay, {@link Flowable#bufferSize()} sizes. + */ + public FlatMapConfig() { + this(false, Flowable.bufferSize(), Flowable.bufferSize()); + } + + /** + * Optionally delay error, {@link Flowable#bufferSize()} sizes + * @param delayErrors should the error be delayed? + */ + public FlatMapConfig(boolean delayErrors) { + this(delayErrors, Flowable.bufferSize(), Flowable.bufferSize()); + } + + /** + * Optionally set the buffer size, no delay errors. + * @param maxConcurrency the maximum number of concurrent flows + */ + public FlatMapConfig(int maxConcurrency) { + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + this(false, maxConcurrency, Flowable.bufferSize()); + } + + /** + * Optionally delays errors and sets the buffer size too. + * @param delayError + * @param maxConcurrency the maximum number of concurrent flows + */ + public FlatMapConfig(boolean delayErrors, int maxConcurrency) { + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + this(delayErrors, maxConcurrency, Flowable.bufferSize()); + } + + /** + * Fully customize the configuration. + * @param delayErrors should the errors be delayed + * @param bufferSize what would be the buffer size + * @param prefetch what would be the prefetch amount + */ + public FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) { + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + this.delayErrors = delayErrors; + this.maxConcurrency = maxConcurrency; + this.bufferSize = bufferSize; + } +} diff --git a/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java b/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java new file mode 100644 index 0000000000..fff26ff638 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.core.config; + +import io.reactivex.rxjava4.core.Flowable; +import io.reactivex.rxjava4.internal.functions.ObjectHelper; + +/** + * Generic configuration block with option to delay errors, change prefetch + * amounts and buffer sizes. + * TODO once value classes are available, make this a record class. + * @since 4.0.0 + */ +public record GenericConfig(boolean delayError, int bufferSize, int prefetch) { + + /** + * Default config: no error delay, {@link Flowable#bufferSize()} sizes. + */ + public GenericConfig() { + this(false, Flowable.bufferSize(), Flowable.bufferSize()); + } + + /** + * Optionally delay error, {@link Flowable#bufferSize()} sizes + * @param delayError should the error be delayed? + */ + public GenericConfig(boolean delayError) { + this(delayError, Flowable.bufferSize(), Flowable.bufferSize()); + } + + /** + * Optionally set the buffer size, no delay errors. + * @param bufferSize the prefetch and the buffer size + */ + public GenericConfig(int bufferSize) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + this(false, bufferSize, Flowable.bufferSize()); + } + + /** + * Optionally delays errors and sets the buffer size too. + * @param delayError + * @param bufferSize the prefetch and the buffer size + */ + public GenericConfig(boolean delayError, int bufferSize) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + this(delayError, bufferSize, Flowable.bufferSize()); + } + + /** + * Fully customize the configuration. + * @param delayError should the errors be delayed + * @param bufferSize what would be the buffer size + * @param prefetch what would be the prefetch amount + */ + public GenericConfig(boolean delayError, int bufferSize, int prefetch) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + this.delayError = delayError; + this.bufferSize = bufferSize; + this.prefetch = prefetch; + } +} diff --git a/src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java b/src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java index 357307cddd..33b9e007bc 100644 --- a/src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java +++ b/src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java @@ -19,6 +19,7 @@ /** * Documents a set of operators so that the main Flowable source file is not cluttered. * @param the element type of the flow + * @since 4.0.0 */ public sealed interface FlowableDocBasic permits Flowable { /** diff --git a/src/main/module/module-info.java b/src/main/module/module-info.java index f59cbe5db8..fb925e8dc4 100644 --- a/src/main/module/module-info.java +++ b/src/main/module/module-info.java @@ -14,6 +14,8 @@ module io.reactivex.rxjava4 { exports io.reactivex.rxjava4.annotations; exports io.reactivex.rxjava4.core; + exports io.reactivex.rxjava4.core.docs; + exports io.reactivex.rxjava4.core.config; exports io.reactivex.rxjava4.disposables; exports io.reactivex.rxjava4.exceptions; exports io.reactivex.rxjava4.flowables; diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableFlatMapTest.java index 9d22fefa75..cb4890d0bb 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableFlatMapTest.java @@ -27,6 +27,7 @@ import io.reactivex.rxjava4.annotations.NonNull; import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.exceptions.*; import io.reactivex.rxjava4.functions.*; import io.reactivex.rxjava4.internal.functions.Functions; @@ -344,7 +345,7 @@ public Flowable apply(Integer t1) { return composer(Flowable.range(t1 * 10, 2), subscriptionCount, m) .subscribeOn(Schedulers.computation()); } - }, m); + }, new FlatMapConfig(m)); TestSubscriber ts = new TestSubscriber<>(); @@ -585,7 +586,7 @@ public void justEmptyMixtureMaxConcurrent() { public Flowable apply(Integer v) { return (v & 1) == 0 ? Flowable.empty() : Flowable.just(v); } - }, 16) + }, new FlatMapConfig(16)) .subscribe(ts); ts.assertValueCount(2 * Flowable.bufferSize()); @@ -610,7 +611,7 @@ public void rangeEmptyMixtureMaxConcurrent() { public Flowable apply(Integer v) { return (v & 1) == 0 ? Flowable.empty() : Flowable.range(v, 2); } - }, 16) + }, new FlatMapConfig(16)) .subscribe(ts); ts.assertValueCount(4 * Flowable.bufferSize()); @@ -854,7 +855,7 @@ public Integer apply(Integer w) throws Exception { } }); } - }, true) + }, new FlatMapConfig(true)) .to(TestHelper.testConsumer()) .assertFailure(CompositeException.class); @@ -1063,7 +1064,7 @@ public void maxConcurrencySustained() { public Flowable apply(PublishProcessor v) throws Exception { return v; } - }, 2) + }, new FlatMapConfig(2)) .doOnNext(new Consumer() { @Override public void accept(Integer v) throws Exception { @@ -1115,7 +1116,7 @@ public Flowable apply(Flowable upstream) { public Publisher apply(Integer v) throws Throwable { return Flowable.just(v).hide(); } - }, true); + }, new FlatMapConfig(true)); } }); } @@ -1250,7 +1251,7 @@ public void fusedInnerCrash() { }) .compose(TestHelper.flowableStripBoundary()) ) - .flatMap(v -> v, true) + .flatMap(v -> v, new FlatMapConfig(true)) .doOnNext(v -> { if (v == 1) { pp.onNext(2); @@ -1280,7 +1281,7 @@ public void fusedInnerCrash2() { .compose(TestHelper.flowableStripBoundary()) , pp ) - .flatMap(v -> v, true) + .flatMap(v -> v, new FlatMapConfig(true)) .doOnNext(v -> { if (v == 1) { pp.onNext(2); @@ -1304,7 +1305,7 @@ public void doubleOnSubscribe() { public void allConcurrency() { Flowable.just(1) .hide() - .flatMap(_ -> Flowable.just(2).hide(), Integer.MAX_VALUE) + .flatMap(_ -> Flowable.just(2).hide(), new FlatMapConfig(Integer.MAX_VALUE)) .test() .assertResult(2); } @@ -1313,7 +1314,7 @@ public void allConcurrency() { public void allConcurrencyScalarInner() { Flowable.just(1) .hide() - .flatMap(_ -> Flowable.just(2), Integer.MAX_VALUE) + .flatMap(_ -> Flowable.just(2), new FlatMapConfig(Integer.MAX_VALUE)) .test() .assertResult(2); } @@ -1322,7 +1323,7 @@ public void allConcurrencyScalarInner() { public void allConcurrencyScalarInnerEmpty() { Flowable.just(1) .hide() - .flatMap(_ -> Flowable.empty(), Integer.MAX_VALUE) + .flatMap(_ -> Flowable.empty(), new FlatMapConfig(Integer.MAX_VALUE)) .test() .assertResult(); } @@ -1361,7 +1362,7 @@ public void someConcurrencyScalarInnerCancel() { public void allConcurrencyBackpressured() { Flowable.just(1) .hide() - .flatMap(_ -> Flowable.just(2), Integer.MAX_VALUE) + .flatMap(_ -> Flowable.just(2), new FlatMapConfig(Integer.MAX_VALUE)) .test(0L) .assertEmpty() .requestMore(1) @@ -1372,7 +1373,7 @@ public void allConcurrencyBackpressured() { public void someConcurrencyInnerScalarCancel() { Flowable.just(1) .hide() - .flatMap(_ -> Flowable.just(2), 2) + .flatMap(_ -> Flowable.just(2), new FlatMapConfig(2)) .takeUntil(_ -> true) .test() .assertResult(2); @@ -1389,7 +1390,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int subscriber.onNext(3); } } - .flatMap(v -> Flowable.just(v), 1) + .flatMap(v -> Flowable.just(v), new FlatMapConfig(1)) .test(0L) .assertFailure(QueueOverflowException.class); } @@ -1405,7 +1406,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int subscriber.onNext(1); } } - .flatMap(v -> Flowable.just(v), 1) + .flatMap(v -> Flowable.just(v), new FlatMapConfig(1)) .doOnNext(v -> { if (v == 1) { ref.get().onNext(2); @@ -1428,7 +1429,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int subscriber.onNext(2); subscriber.onNext(3); } - }, false, 1, 1) + }, new FlatMapConfig(false, 1, 1)) .test(0L) .assertFailure(QueueOverflowException.class); } @@ -1437,7 +1438,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int public void takeFromScalarQueue() { Flowable.just(1) .hide() - .flatMap(_ -> Flowable.just(2), 2) + .flatMap(_ -> Flowable.just(2), new FlatMapConfig(2)) .takeUntil(_ -> true) .test(0L) .requestMore(2) @@ -1449,7 +1450,7 @@ public void scalarInnerQueueEmpty() { Flowable.just(1) .concatWith(Flowable.never()) .hide() - .flatMap(_ -> Flowable.just(2), 2) + .flatMap(_ -> Flowable.just(2), new FlatMapConfig(2)) .test(0L) .requestMore(2) .assertValuesOnly(2); @@ -1495,8 +1496,7 @@ public void mixedScalarAsync() { .just(-integer) .observeOn(Schedulers.computation()); }, - false, - 1 + new FlatMapConfig(false, 1) ) .ignoreElements() .blockingAwait(); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableGroupByTest.java index 6640c77483..9cb751ee8d 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableGroupByTest.java @@ -31,6 +31,7 @@ import com.google.common.cache.*; import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.exceptions.*; import io.reactivex.rxjava4.flowables.GroupedFlowable; import io.reactivex.rxjava4.functions.*; @@ -1356,7 +1357,7 @@ public String apply(Integer l) { } }); } - }, 4000) // a lot of groups are created due to take(2) + }, new FlatMapConfig(4000)) // a lot of groups are created due to take(2) .subscribe(ts); ts.awaitDone(5, TimeUnit.SECONDS); @@ -2591,7 +2592,7 @@ static void issue6974Run(int groups, int iterations, int sizeCap, int flatMapCon .range(1, groups) .repeat(iterations / groups) .groupBy(i -> i, i -> i, false, 128, sizeCap(sizeCap, notifyOnExplicitRevoke)) - .flatMap(gf -> gf.compose(operation), flatMapConcurrency) + .flatMap(gf -> gf.compose(operation), new FlatMapConfig(flatMapConcurrency)) .test(); test.awaitDone(5, TimeUnit.SECONDS); test.assertValueCount(iterations); @@ -2624,7 +2625,7 @@ static void issue6974RunPart2(int groupByBufferSize, int flatMapMaxConcurrency, sizeCap(groups * 100, notifyOnExplicitEviction)) .flatMap(gf -> gf .take(10, TimeUnit.MILLISECONDS) - , flatMapMaxConcurrency) + , new FlatMapConfig(flatMapMaxConcurrency)) .test(); ts @@ -2674,7 +2675,7 @@ static void issue6974RunPart2NoEvict(int groupByBufferSize, int flatMapMaxConcur .groupBy(i -> i) .flatMap(gf -> gf .take(10, TimeUnit.MILLISECONDS) - , flatMapMaxConcurrency) + , new FlatMapConfig(flatMapMaxConcurrency)) .subscribeWith(new TestSubscriberEx<>()) .awaitDone(5, TimeUnit.SECONDS) .assertTerminated(); // MBE is possible if the async group closing is slow @@ -2722,7 +2723,7 @@ public void issue6974Part2Case1ObserveOn() { .observeOn(Schedulers.computation()) // .take(10) .take(10, TimeUnit.MILLISECONDS) - , flatMapMaxConcurrency) + , new FlatMapConfig(flatMapMaxConcurrency)) .subscribeWith(new TestSubscriberEx<>()) .awaitDone(5, TimeUnit.SECONDS) .assertTerminated(); // MBE is possible if the async group closing is slow @@ -2748,7 +2749,7 @@ public void issue6974Part2Case1ObserveOnHide() { .observeOn(Schedulers.computation()) // .take(10) .take(10, TimeUnit.MILLISECONDS) - , flatMapMaxConcurrency) + , new FlatMapConfig(flatMapMaxConcurrency)) .subscribeWith(new TestSubscriberEx<>()) .awaitDone(5, TimeUnit.SECONDS) .assertTerminated(); // MBE is possible if the async group closing is slow @@ -2772,7 +2773,7 @@ public void issue6974Part2Case1ObserveOnNoCap() { .observeOn(Schedulers.computation()) // .take(10) .take(10, TimeUnit.MILLISECONDS) - , flatMapMaxConcurrency) + , new FlatMapConfig(flatMapMaxConcurrency)) .test() .awaitDone(5, TimeUnit.SECONDS) .assertNoErrors() @@ -2798,7 +2799,7 @@ public void issue6974Part2Case1ObserveOnNoCapHide() { .observeOn(Schedulers.computation()) // .take(10) .take(10, TimeUnit.MILLISECONDS) - , flatMapMaxConcurrency) + , new FlatMapConfig(flatMapMaxConcurrency)) .test() .awaitDone(5, TimeUnit.SECONDS) .assertNoErrors() @@ -2835,7 +2836,7 @@ public void issue6974Part2Case1ObserveOnConditional() { .filter(_ -> true) // .take(10) .take(10, TimeUnit.MILLISECONDS) - , flatMapMaxConcurrency) + , new FlatMapConfig(flatMapMaxConcurrency)) .subscribeWith(new TestSubscriberEx<>()) .awaitDone(5, TimeUnit.SECONDS) .assertTerminated(); // MBE is possible if the async group closing is slow @@ -2862,7 +2863,7 @@ public void issue6974Part2Case1ObserveOnConditionalHide() { .filter(_ -> true) // .take(10) .take(10, TimeUnit.MILLISECONDS) - , flatMapMaxConcurrency) + , new FlatMapConfig(flatMapMaxConcurrency)) .subscribeWith(new TestSubscriberEx<>()) .awaitDone(5, TimeUnit.SECONDS) .assertTerminated(); // MBE is possible if the async group closing is slow @@ -2908,7 +2909,7 @@ public void issue6982Case1() { .range(1, 500_000) .map(i -> i % groups) .groupBy(i -> i, i -> i, false, groupByBufferSize, ttlCapGuava(Duration.ofMillis(10))) - .flatMap(gf -> gf.observeOn(Schedulers.computation()), flatMapMaxConcurrency) + .flatMap(gf -> gf.observeOn(Schedulers.computation()), new FlatMapConfig(flatMapMaxConcurrency)) .test() .awaitDone(5, TimeUnit.SECONDS) .assertNoErrors() @@ -2938,7 +2939,7 @@ public void issue6982Case2() { .range(1, 500_000) .map(i -> i % groups) .groupBy(i -> i, i -> i, false, groupByBufferSize, ttlCapGuava(Duration.ofMillis(10))) - .flatMap(gf -> gf.observeOn(Schedulers.computation()), flatMapMaxConcurrency) + .flatMap(gf -> gf.observeOn(Schedulers.computation()), new FlatMapConfig(flatMapMaxConcurrency)) .test() .awaitDone(5, TimeUnit.SECONDS) .assertNoErrors() diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeTest.java index 8c25a755af..f977662af6 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeTest.java @@ -27,6 +27,7 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.core.Scheduler.Worker; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.exceptions.TestException; import io.reactivex.rxjava4.functions.*; import io.reactivex.rxjava4.internal.functions.Functions; @@ -1441,7 +1442,7 @@ public void flatMapJustRange() { public void flatMapMaxConcurrentJustJust() { TestSubscriber ts = TestSubscriber.create(); - Flowable.just(Flowable.just(1)).flatMap((Function)Functions.identity(), 5).subscribe(ts); + Flowable.just(Flowable.just(1)).flatMap((Function)Functions.identity(), new FlatMapConfig(5)).subscribe(ts); ts.assertValue(1); ts.assertNoErrors(); @@ -1453,7 +1454,7 @@ public void flatMapMaxConcurrentJustJust() { public void flatMapMaxConcurrentJustRange() { TestSubscriber ts = TestSubscriber.create(); - Flowable.just(Flowable.range(1, 5)).flatMap((Function)Functions.identity(), 5).subscribe(ts); + Flowable.just(Flowable.range(1, 5)).flatMap((Function)Functions.identity(), new FlatMapConfig(5)).subscribe(ts); ts.assertValues(1, 2, 3, 4, 5); ts.assertNoErrors(); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableRetryTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableRetryTest.java index 215a066804..b1bbb6e1c3 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableRetryTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableRetryTest.java @@ -26,6 +26,7 @@ import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.disposables.Disposable; import io.reactivex.rxjava4.exceptions.TestException; import io.reactivex.rxjava4.flowables.GroupedFlowable; @@ -864,7 +865,7 @@ public String apply(String t1) { public Flowable apply(GroupedFlowable t1) { return t1.take(1); } - }, NUM_MSG) // Must request as many groups as groupBy produces to avoid MBE + }, new FlatMapConfig(NUM_MSG)) // Must request as many groups as groupBy produces to avoid MBE .subscribe(new TestSubscriber<>(subscriber)); InOrder inOrder = inOrder(subscriber); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableWindowWithFlowableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableWindowWithFlowableTest.java index d240887306..595eedf98e 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableWindowWithFlowableTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableWindowWithFlowableTest.java @@ -25,6 +25,7 @@ import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.exceptions.*; import io.reactivex.rxjava4.functions.*; import io.reactivex.rxjava4.internal.functions.Functions; @@ -258,7 +259,7 @@ public void boundaryDispose() { public void boundaryOnError() { TestSubscriberEx ts = Flowable.error(new TestException()) .window(Flowable.never()) - .flatMap(Functions.>identity(), true) + .flatMap(Functions.>identity(), new FlatMapConfig(true)) .to(TestHelper.testConsumer()) .assertFailure(CompositeException.class); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableWindowWithTimeTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableWindowWithTimeTest.java index 9c92b85243..a08254de13 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableWindowWithTimeTest.java @@ -23,6 +23,7 @@ import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.core.config.FlatMapConfig; import io.reactivex.rxjava4.exceptions.*; import io.reactivex.rxjava4.functions.*; import io.reactivex.rxjava4.internal.functions.Functions; @@ -249,7 +250,7 @@ public void timespanDefaultSchedulerSize() { public void timespanDefaultSchedulerSizeRestart() { Flowable.range(1, 10) .window(1, TimeUnit.MINUTES, 20, true) - .flatMap(Functions.>identity(), true) + .flatMap(Functions.>identity(), new FlatMapConfig(true)) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); diff --git a/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java index bc4f3fea23..8c570f7552 100644 --- a/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java @@ -17,14 +17,15 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.Flow.*; import java.util.stream.*; import org.junit.Test; -import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.core.Observable; import io.reactivex.rxjava4.core.Observer; +import io.reactivex.rxjava4.core.config.*; import io.reactivex.rxjava4.disposables.*; import io.reactivex.rxjava4.exceptions.TestException; import io.reactivex.rxjava4.functions.*; @@ -612,6 +613,9 @@ public void checkParallelFlowable() { VirtualGenerator vg = _ -> { }; defaultValues.put(VirtualGenerator.class, vg); + defaultValues.put(FlatMapConfig.class, new FlatMapConfig()); + defaultValues.put(GenericConfig.class, new GenericConfig()); + @SuppressWarnings("rawtypes") class MixedConverters implements FlowableConverter, ObservableConverter, SingleConverter, MaybeConverter, CompletableConverter, ParallelFlowableConverter { From 6df9a6ef6d550a8224b2904bf591f2c85167d8b2 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 19 Mar 2026 12:50:54 +0100 Subject: [PATCH 2/3] Missed these --- config/checkstyle/suppressions.xml | 4 ---- .../reactivex/rxjava4/core/config/FlatMapConfig.java | 10 ++++++---- .../reactivex/rxjava4/core/config/GenericConfig.java | 6 ++---- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml index 0d28559996..f4675fd64c 100644 --- a/config/checkstyle/suppressions.xml +++ b/config/checkstyle/suppressions.xml @@ -12,8 +12,4 @@ - - - - diff --git a/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java b/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java index 3c2e4dfb0a..3b6c3d7718 100644 --- a/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java +++ b/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java @@ -44,8 +44,9 @@ public FlatMapConfig(boolean delayErrors) { * @param maxConcurrency the maximum number of concurrent flows */ public FlatMapConfig(int maxConcurrency) { - ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); - this(false, maxConcurrency, Flowable.bufferSize()); + this(false, + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"), + Flowable.bufferSize()); } /** @@ -54,8 +55,9 @@ public FlatMapConfig(int maxConcurrency) { * @param maxConcurrency the maximum number of concurrent flows */ public FlatMapConfig(boolean delayErrors, int maxConcurrency) { - ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); - this(delayErrors, maxConcurrency, Flowable.bufferSize()); + this(delayErrors, + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"), + Flowable.bufferSize()); } /** diff --git a/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java b/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java index fff26ff638..078148abdd 100644 --- a/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java +++ b/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java @@ -44,8 +44,7 @@ public GenericConfig(boolean delayError) { * @param bufferSize the prefetch and the buffer size */ public GenericConfig(int bufferSize) { - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - this(false, bufferSize, Flowable.bufferSize()); + this(false, ObjectHelper.verifyPositive(bufferSize, "bufferSize"), Flowable.bufferSize()); } /** @@ -54,8 +53,7 @@ public GenericConfig(int bufferSize) { * @param bufferSize the prefetch and the buffer size */ public GenericConfig(boolean delayError, int bufferSize) { - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - this(delayError, bufferSize, Flowable.bufferSize()); + this(delayError, ObjectHelper.verifyPositive(bufferSize, "bufferSize"), Flowable.bufferSize()); } /** From 0b15d3c164fb967a67eb5defd2b91bb0f1531c64 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 19 Mar 2026 12:57:53 +0100 Subject: [PATCH 3/3] Apply checkstyle fixes --- .../rxjava4/core/config/FlatMapConfig.java | 14 +++++++------- .../rxjava4/core/config/GenericConfig.java | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java b/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java index 3b6c3d7718..2506cdf3a3 100644 --- a/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java +++ b/src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java @@ -32,7 +32,7 @@ public FlatMapConfig() { } /** - * Optionally delay error, {@link Flowable#bufferSize()} sizes + * Optionally delay error, {@link Flowable#bufferSize()} sizes * @param delayErrors should the error be delayed? */ public FlatMapConfig(boolean delayErrors) { @@ -44,24 +44,24 @@ public FlatMapConfig(boolean delayErrors) { * @param maxConcurrency the maximum number of concurrent flows */ public FlatMapConfig(int maxConcurrency) { - this(false, - ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"), + this(false, + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"), Flowable.bufferSize()); } /** * Optionally delays errors and sets the buffer size too. - * @param delayError + * @param delayError should the errors be delayed? * @param maxConcurrency the maximum number of concurrent flows */ public FlatMapConfig(boolean delayErrors, int maxConcurrency) { - this(delayErrors, - ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"), + this(delayErrors, + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"), Flowable.bufferSize()); } /** - * Fully customize the configuration. + * Fully customize the configuration. * @param delayErrors should the errors be delayed * @param bufferSize what would be the buffer size * @param prefetch what would be the prefetch amount diff --git a/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java b/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java index 078148abdd..84e6929034 100644 --- a/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java +++ b/src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java @@ -32,7 +32,7 @@ public GenericConfig() { } /** - * Optionally delay error, {@link Flowable#bufferSize()} sizes + * Optionally delay error, {@link Flowable#bufferSize()} sizes. * @param delayError should the error be delayed? */ public GenericConfig(boolean delayError) { @@ -49,7 +49,7 @@ public GenericConfig(int bufferSize) { /** * Optionally delays errors and sets the buffer size too. - * @param delayError + * @param delayError should the errors be delayed? * @param bufferSize the prefetch and the buffer size */ public GenericConfig(boolean delayError, int bufferSize) { @@ -57,7 +57,7 @@ public GenericConfig(boolean delayError, int bufferSize) { } /** - * Fully customize the configuration. + * Fully customize the configuration. * @param delayError should the errors be delayed * @param bufferSize what would be the buffer size * @param prefetch what would be the prefetch amount