diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java index 6fc97c9e8..775d93cd8 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java @@ -2,6 +2,30 @@ import rx.functions.Func1; +import java.nio.ByteBuffer; + public interface RouterFactory { public Router scalarStageToStageRouter(String name, final Func1 toBytes); + + public default Router> keyedRouter(String name, + final Func1 keyEncoder, + final Func1 valueEncoder) { + return new ConsistentHashingRouter(name, new Func1, byte[]>() { + @Override + public byte[] call(KeyValuePair kvp) { + byte[] keyBytes = kvp.getKeyBytes(); + byte[] valueBytes = valueEncoder.call(kvp.getValue()); + return + // length + opcode + notification type + key length + ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) + .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length + .put((byte) 1) // opcode + .put((byte) 1) // notification type + .putInt(keyBytes.length) // key length + .put(keyBytes) // key bytes + .put(valueBytes) // value bytes + .array(); + } + }, HashFunctions.xxh3()); + }; } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java index 7e8592fba..c508eb865 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java @@ -31,26 +31,14 @@ public class Routers implements RouterFactory { public Routers() {} + /** + * Deprecated: use RouterFactory.keyedRouter instead + */ + @Deprecated public static Router> consistentHashingLegacyTcpProtocol(String name, final Func1 keyEncoder, final Func1 valueEncoder) { - return new ConsistentHashingRouter(name, new Func1, byte[]>() { - @Override - public byte[] call(KeyValuePair kvp) { - byte[] keyBytes = kvp.getKeyBytes(); - byte[] valueBytes = valueEncoder.call(kvp.getValue()); - return - // length + opcode + notification type + key length - ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) - .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length - .put((byte) 1) // opcode - .put((byte) 1) // notification type - .putInt(keyBytes.length) // key length - .put(keyBytes) // key bytes - .put(valueBytes) // value bytes - .array(); - } - }, HashFunctions.xxh3()); + return new Routers().keyedRouter(name, keyEncoder, valueEncoder); } private static byte[] dataPayload(byte[] data) { diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java index 73092c926..4cbc3196f 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java @@ -135,6 +135,8 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS Func1 valueEncoder = t1 -> stage.getOutputCodec().encode(t1); Func1 keyEncoder = t1 -> stage.getOutputKeyCodec().encode(t1); + Router> router = this.routerFactory.keyedRouter(jobName, keyEncoder, valueEncoder); + ServerConfig> config = new ServerConfig.Builder>() .name(name) .port(serverPort) @@ -144,7 +146,7 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS .maxChunkTimeMSec(maxChunkTimeMSec()) .bufferCapacity(bufferCapacity()) .useSpscQueue(useSpsc()) - .router(Routers.consistentHashingLegacyTcpProtocol(jobName, keyEncoder, valueEncoder)) + .router(router) .build(); if (stage instanceof ScalarToGroup || stage instanceof GroupToGroup) {