Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class GroupToGroup<K1, T, K2, R> extends KeyValueStageConfig<T, K2, R> {

GroupToGroup(GroupComputation<K1, T, K2, R> computation,
Config<K1, T, K2, R> config, Codec<K1> inputKeyCodec, Codec<T> inputCodec) {
super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters);
super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize);
this.computation = computation;
this.keyExpireTimeSeconds = config.keyExpireTimeSeconds;
}
Expand All @@ -76,6 +76,7 @@ public static class Config<K1, T, K2, R> {
// always assume a stateful calculation is being made
// do not allow config to override
private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private int bufferSize = rx.internal.util.RxRingBuffer.SIZE;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
Expand Down Expand Up @@ -135,6 +136,15 @@ public Config<K1, T, K2, R> withParameters(List<ParameterDefinition<?>> params)
return this;
}

public Config<K1, T, K2, R> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public int getBufferSize() {
return bufferSize;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class GroupToScalar<K, T, R> extends StageConfig<T, R> {

GroupToScalar(GroupToScalarComputation<K, T, R> computation,
Config<K, T, R> config, Codec<K> inputKeyCodec, Codec<T> inputCodec) {
super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency);
super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize);
this.computation = computation;
this.keyExpireTimeSeconds = config.keyExpireTimeSeconds;
}
Expand All @@ -79,6 +79,7 @@ public static class Config<K, T, R> {
// do not allow config override
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private int concurrency = DEFAULT_STAGE_CONCURRENCY;
private int bufferSize = rx.internal.util.RxRingBuffer.SIZE;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
Expand Down Expand Up @@ -149,6 +150,15 @@ public Config<K, T, R> withParameters(List<ParameterDefinition<?>> params) {
return this;
}

public Config<K, T, R> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public int getBufferSize() {
return bufferSize;
}

}

}
12 changes: 11 additions & 1 deletion mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class KeyToKey<K1, T, K2, R> extends KeyValueStageConfig<T, K2, R> {

KeyToKey(KeyComputation<K1, T, K2, R> computation,
Config<K1, T, K2, R> config, Codec<K1> inputKeyCodec, Codec<T> inputCodec) {
super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters);
super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize);
this.computation = computation;
this.keyExpireTimeSeconds = config.keyExpireTimeSeconds;
}
Expand All @@ -78,6 +78,7 @@ public static class Config<K1, T, K2, R> {
// always assume a stateful calculation is being made
// do not allow config to override
private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private int bufferSize = rx.internal.util.RxRingBuffer.SIZE;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
Expand Down Expand Up @@ -136,6 +137,15 @@ public Config<K1, T, K2, R> withParameters(List<ParameterDefinition<?>> params)
this.parameters = params;
return this;
}

public Config<K1, T, K2, R> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public int getBufferSize() {
return bufferSize;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class KeyToScalar<K, T, R> extends StageConfig<T, R> {

KeyToScalar(ToScalarComputation<K, T, R> computation,
Config<K, T, R> config, Codec<K> inputKeyCodec, Codec<T> inputCodec) {
super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters);
super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize);
this.computation = computation;
this.keyExpireTimeSeconds = config.keyExpireTimeSeconds;
}
Expand All @@ -67,6 +67,7 @@ public static class Config<K, T, R> {
// 'stateful group calculation' use case
// do not allow config override
private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private int bufferSize = rx.internal.util.RxRingBuffer.SIZE;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
Expand Down Expand Up @@ -117,6 +118,15 @@ public Config<K, T, R> withParameters(List<ParameterDefinition<?>> params) {
return this;
}

public Config<K, T, R> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public int getBufferSize() {
return bufferSize;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ public KeyValueStageConfig(String description, Codec<?> inputKeyCodec, Codec<T>
}

public KeyValueStageConfig(String description, Codec<?> inputKeyCodec, Codec<T> inputCodec, Codec<K> outputKeyCodec, Codec<R> outputCodec, INPUT_STRATEGY inputStrategy, List<ParameterDefinition<?>> params, int concurrency) {
super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency);
this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, concurrency, rx.internal.util.RxRingBuffer.SIZE);
}

public KeyValueStageConfig(String description, Codec<?> inputKeyCodec, Codec<T> inputCodec, Codec<K> outputKeyCodec, Codec<R> outputCodec, INPUT_STRATEGY inputStrategy, List<ParameterDefinition<?>> params, int concurrency, int bufferSize) {
super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency, bufferSize);
this.keyCodec = outputKeyCodec;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ScalarToGroup<T, K, R> extends KeyValueStageConfig<T, K, R> {

public ScalarToGroup(ToGroupComputation<T, K, R> computation,
Config<T, K, R> config, Codec<T> inputCodec) {
super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency);
super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize);
this.computation = computation;
this.keyExpireTimeSeconds = config.keyExpireTimeSeconds;

Expand All @@ -77,6 +77,7 @@ public static class Config<T, K, R> {
// default input type is concurrent for 'grouping' use case
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.CONCURRENT;
private int concurrency = DEFAULT_STAGE_CONCURRENCY;
private int bufferSize = rx.internal.util.RxRingBuffer.SIZE;
private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

Expand Down Expand Up @@ -155,5 +156,14 @@ public Config<T, K, R> withParameters(List<ParameterDefinition<?>> params) {
this.parameters = params;
return this;
}

public Config<T, K, R> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public int getBufferSize() {
return bufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class ScalarToKey<T, K, R> extends KeyValueStageConfig<T, K, R> {

ScalarToKey(ToKeyComputation<T, K, R> computation,
Config<T, K, R> config, Codec<T> inputCodec) {
super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters);
super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize);
this.computation = computation;
this.keyExpireTimeSeconds = config.keyExpireTimeSeconds;
}
Expand All @@ -72,6 +72,7 @@ public static class Config<T, K, R> {
// default input type is concurrent for 'grouping' use case
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.CONCURRENT;
private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default
private int bufferSize = rx.internal.util.RxRingBuffer.SIZE;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
Expand Down Expand Up @@ -140,5 +141,14 @@ public Config<T, K, R> withParameters(List<ParameterDefinition<?>> params) {
this.parameters = params;
return this;
}

public Config<T, K, R> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public int getBufferSize() {
return bufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ScalarToScalar<T, R> extends StageConfig<T, R> {

public ScalarToScalar(ScalarComputation<T, R> computation,
Config<T, R> config, Codec<T> inputCodec) {
super(config.description, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency);
super(config.description, null, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize);
this.computation = computation;
this.inputStrategy = config.inputStrategy;
this.parameters = config.parameters;
Expand All @@ -68,6 +68,7 @@ public static class Config<T, R> {
// default input type is serial for 'collecting' use case
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private volatile int concurrency = StageConfig.DEFAULT_STAGE_CONCURRENCY;
private int bufferSize = rx.internal.util.RxRingBuffer.SIZE;

private List<ParameterDefinition<?>> parameters = Collections.emptyList();

Expand Down Expand Up @@ -129,5 +130,14 @@ public INPUT_STRATEGY getInputStrategy() {
public int getConcurrency() {
return concurrency;
}

public Config<T, R> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public int getBufferSize() {
return bufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.mantisrx.runtime.parameter.ParameterDefinition;
import java.util.Collections;
import java.util.List;
import rx.internal.util.RxRingBuffer;


public abstract class StageConfig<T, R> {
Expand All @@ -40,6 +41,9 @@ public abstract class StageConfig<T, R> {
// number of inner observables processed
private int concurrency = DEFAULT_STAGE_CONCURRENCY;

// buffer size for observeOn scheduler, defaults to RxRingBuffer.SIZE
private final int bufferSize;

public StageConfig(String description, Codec<T> inputCodec,
Codec<R> outputCodec, INPUT_STRATEGY inputStrategy) {
this(description, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), DEFAULT_STAGE_CONCURRENCY);
Expand Down Expand Up @@ -69,13 +73,20 @@ public StageConfig(String description, Codec<T> inputCodec,
public <K> StageConfig(String description, Codec<K> inputKeyCodec, Codec<T> inputCodec,
Codec<R> outputCodec, INPUT_STRATEGY inputStrategy, List<ParameterDefinition<?>> params,
int concurrency) {
this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency, RxRingBuffer.SIZE);
}

public <K> StageConfig(String description, Codec<K> inputKeyCodec, Codec<T> inputCodec,
Codec<R> outputCodec, INPUT_STRATEGY inputStrategy, List<ParameterDefinition<?>> params,
int concurrency, int bufferSize) {
this.description = description;
this.inputKeyCodec = inputKeyCodec;
this.inputCodec = inputCodec;
this.outputCodec = outputCodec;
this.inputStrategy = inputStrategy;
this.parameters = params;
this.concurrency = concurrency;
this.bufferSize = bufferSize;
}

public String getDescription() {
Expand Down Expand Up @@ -109,5 +120,9 @@ public int getConcurrency() {
return concurrency;
}

public int getBufferSize() {
return bufferSize;
}

public enum INPUT_STRATEGY {NONE_SPECIFIED, SERIAL, CONCURRENT}
}
Loading
Loading