Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.0-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.1-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
2 changes: 1 addition & 1 deletion gradlew

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import io.reactivex.rxjava4.core.config.FlatMapConfig;
import io.reactivex.rxjava4.functions.Action;
import io.reactivex.rxjava4.internal.functions.Functions;
import io.reactivex.rxjava4.schedulers.Schedulers;
Expand Down Expand Up @@ -59,7 +60,7 @@ public void setup() {
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);

flatMap = Flowable.fromArray(array)
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency);
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), new FlatMapConfig(false, maxConcurrency));
}

// @Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import io.reactivex.rxjava4.core.config.FlatMapConfig;
import io.reactivex.rxjava4.internal.functions.Functions;

@SuppressWarnings("exports")
Expand Down Expand Up @@ -49,7 +50,7 @@ public void setup() {
.flatMapCompletable(Functions.justFunction(Completable.complete()), false, maxConcurrency);

flatMap = Flowable.fromArray(array)
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), false, maxConcurrency);
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), new FlatMapConfig(false, maxConcurrency));
}

@Benchmark
Expand Down
3 changes: 2 additions & 1 deletion src/jmh/java/io/reactivex/rxjava4/parallel/ParallelPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.concurrent.Flow.*;

import io.reactivex.rxjava4.core.*;
import io.reactivex.rxjava4.core.config.FlatMapConfig;
import io.reactivex.rxjava4.flowables.GroupedFlowable;
import io.reactivex.rxjava4.functions.Function;
import io.reactivex.rxjava4.schedulers.Schedulers;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Publisher<Integer> apply(Integer v) {
return Flowable.just(v).subscribeOn(Schedulers.computation())
.map(ParallelPerf.this);
}
}, cpu);
}, new FlatMapConfig(cpu));

groupBy = source.groupBy(new Function<Integer, Integer>() {
int i;
Expand Down
172 changes: 27 additions & 145 deletions src/main/java/io/reactivex/rxjava4/core/Flowable.java

Large diffs are not rendered by default.

76 changes: 76 additions & 0 deletions src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.core.config;

import io.reactivex.rxjava4.core.Flowable;
import io.reactivex.rxjava4.internal.functions.ObjectHelper;

/**
* Generic configuration block with option to delay errors, change prefetch
* amounts and buffer sizes.
* TODO once value classes are available, make this a record class.
* @since 4.0.0
*/
public record FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {

/**
* Default config: no error delay, {@link Flowable#bufferSize()} sizes.
*/
public FlatMapConfig() {
this(false, Flowable.bufferSize(), Flowable.bufferSize());
}

/**
* Optionally delay error, {@link Flowable#bufferSize()} sizes
* @param delayErrors should the error be delayed?
*/
public FlatMapConfig(boolean delayErrors) {
this(delayErrors, Flowable.bufferSize(), Flowable.bufferSize());
}

/**
* Optionally set the buffer size, no delay errors.
* @param maxConcurrency the maximum number of concurrent flows
*/
public FlatMapConfig(int maxConcurrency) {
this(false,
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"),
Flowable.bufferSize());
}

/**
* Optionally delays errors and sets the buffer size too.
* @param delayError should the errors be delayed?
* @param maxConcurrency the maximum number of concurrent flows
*/
public FlatMapConfig(boolean delayErrors, int maxConcurrency) {
this(delayErrors,
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"),
Flowable.bufferSize());
}

/**
* Fully customize the configuration.
* @param delayErrors should the errors be delayed
* @param bufferSize what would be the buffer size
* @param prefetch what would be the prefetch amount
*/
public FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
}
72 changes: 72 additions & 0 deletions src/main/java/io/reactivex/rxjava4/core/config/GenericConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.core.config;

import io.reactivex.rxjava4.core.Flowable;
import io.reactivex.rxjava4.internal.functions.ObjectHelper;

/**
* Generic configuration block with option to delay errors, change prefetch
* amounts and buffer sizes.
* TODO once value classes are available, make this a record class.
* @since 4.0.0
*/
public record GenericConfig(boolean delayError, int bufferSize, int prefetch) {

/**
* Default config: no error delay, {@link Flowable#bufferSize()} sizes.
*/
public GenericConfig() {
this(false, Flowable.bufferSize(), Flowable.bufferSize());
}

/**
* Optionally delay error, {@link Flowable#bufferSize()} sizes.
* @param delayError should the error be delayed?
*/
public GenericConfig(boolean delayError) {
this(delayError, Flowable.bufferSize(), Flowable.bufferSize());
}

/**
* Optionally set the buffer size, no delay errors.
* @param bufferSize the prefetch and the buffer size
*/
public GenericConfig(int bufferSize) {
this(false, ObjectHelper.verifyPositive(bufferSize, "bufferSize"), Flowable.bufferSize());
}

/**
* Optionally delays errors and sets the buffer size too.
* @param delayError should the errors be delayed?
* @param bufferSize the prefetch and the buffer size
*/
public GenericConfig(boolean delayError, int bufferSize) {
this(delayError, ObjectHelper.verifyPositive(bufferSize, "bufferSize"), Flowable.bufferSize());
}

/**
* Fully customize the configuration.
* @param delayError should the errors be delayed
* @param bufferSize what would be the buffer size
* @param prefetch what would be the prefetch amount
*/
public GenericConfig(boolean delayError, int bufferSize, int prefetch) {
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
ObjectHelper.verifyPositive(prefetch, "prefetch");
this.delayError = delayError;
this.bufferSize = bufferSize;
this.prefetch = prefetch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
/**
* Documents a set of operators so that the main Flowable source file is not cluttered.
* @param <T> the element type of the flow
* @since 4.0.0
*/
public sealed interface FlowableDocBasic<T> permits Flowable {
/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/module/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
module io.reactivex.rxjava4 {
exports io.reactivex.rxjava4.annotations;
exports io.reactivex.rxjava4.core;
exports io.reactivex.rxjava4.core.docs;
exports io.reactivex.rxjava4.core.config;
exports io.reactivex.rxjava4.disposables;
exports io.reactivex.rxjava4.exceptions;
exports io.reactivex.rxjava4.flowables;
Expand Down
Loading