From 2b764bacba85ffae2071d4b821a9a1fe6d1f6628 Mon Sep 17 00:00:00 2001 From: "ashay.patil" Date: Thu, 14 Nov 2024 08:40:21 +0530 Subject: [PATCH 1/5] hierarchical actor support - init --- memq-actor/pom.xml | 6 + .../java/io/appform/memq/ActorSystem.java | 6 +- .../java/io/appform/memq/HighLevelActor.java | 3 +- .../io/appform/memq/HighLevelActorConfig.java | 22 +- .../java/io/appform/memq/actor/Actor.java | 9 +- .../java/io/appform/memq/actor/IActor.java | 15 ++ .../HierarchialHighLevelActorConfig.java | 29 +++ .../memq/hierarchical/HierarchicalActor.java | 188 ++++++++++++++++++ .../HierarchicalHighLevelActor.java | 113 +++++++++++ .../HierarchicalOperationWorker.java | 56 ++++++ .../HierarchicalOperationWorkerConfig.java | 52 +++++ .../hierarchical/HierarchicalRouterUtils.java | 68 +++++++ .../memq/hierarchical/IHierarchicalActor.java | 17 ++ .../HierarchicalDataStoreSupplierTree.java | 63 ++++++ .../tree/HierarchicalDataStoreTree.java | 65 ++++++ .../tree/HierarchicalDataStoreTreeNode.java | 107 ++++++++++ .../tree/HierarchicalTreeConfig.java | 18 ++ .../tree/TriConsumerSupplier.java | 6 + .../tree/key/HierarchicalRoutingKey.java | 7 + .../hierarchical/tree/key/RoutingKey.java | 20 ++ .../appform/memq/observer/ActorObserver.java | 4 +- .../memq/observer/TerminalActorObserver.java | 4 +- .../memq/stats/ActorMetricObserver.java | 4 +- .../java/io/appform/memq/helper/TestUtil.java | 17 +- .../FlowHierarchicalMemqActorConfig.java | 16 ++ .../HierarchicalHighLevelActorTest.java | 119 +++++++++++ ...C2CDataActionMessageHierarchicalActor.java | 24 +++ ...C2MDataActionMessageHierarchicalActor.java | 25 +++ .../FlowTypeHierarchicalActorBuilder.java | 30 +++ .../memq/hierarchical/data/ActionMessage.java | 33 +++ .../data/C2CDataActionMessage.java | 28 +++ .../data/C2MDataActionMessage.java | 28 +++ .../memq/hierarchical/data/FlowType.java | 25 +++ .../java/io/appform/memq/util/YamlReader.java | 48 +++++ .../test/resources/rmqHierarchicalMemq.yaml | 23 +++ .../main/java/io/appform/MemqActorSystem.java | 17 +- 36 files changed, 1284 insertions(+), 31 deletions(-) create mode 100644 memq-actor/src/main/java/io/appform/memq/actor/IActor.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchialHighLevelActorConfig.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorker.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorkerConfig.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterUtils.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreSupplierTree.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTree.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTreeNode.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalTreeConfig.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/HierarchicalRoutingKey.java create mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/RoutingKey.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2CDataActionMessageHierarchicalActor.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2MDataActionMessageHierarchicalActor.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2CDataActionMessage.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2MDataActionMessage.java create mode 100644 memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java create mode 100644 memq-actor/src/test/java/io/appform/memq/util/YamlReader.java create mode 100644 memq-actor/src/test/resources/rmqHierarchicalMemq.yaml diff --git a/memq-actor/pom.xml b/memq-actor/pom.xml index 5c6f49a..1e590a8 100644 --- a/memq-actor/pom.xml +++ b/memq-actor/pom.xml @@ -47,6 +47,12 @@ jackson-databind 2.16.1 + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + 2.13.5 + test + io.dropwizard.metrics metrics-core diff --git a/memq-actor/src/main/java/io/appform/memq/ActorSystem.java b/memq-actor/src/main/java/io/appform/memq/ActorSystem.java index 05ad293..2eb8f31 100644 --- a/memq-actor/src/main/java/io/appform/memq/ActorSystem.java +++ b/memq-actor/src/main/java/io/appform/memq/ActorSystem.java @@ -1,12 +1,13 @@ package io.appform.memq; import com.codahale.metrics.MetricRegistry; -import io.appform.memq.actor.Actor; +import io.appform.memq.actor.IActor; import io.appform.memq.actor.Message; import io.appform.memq.exceptionhandler.config.DropConfig; import io.appform.memq.exceptionhandler.config.ExceptionHandlerConfigVisitor; import io.appform.memq.exceptionhandler.config.SidelineConfig; import io.appform.memq.actor.MessageMeta; +import io.appform.memq.hierarchical.IHierarchicalActor; import io.appform.memq.observer.ActorObserver; import io.appform.memq.retry.RetryStrategy; import io.appform.memq.stats.ActorMetricObserver; @@ -20,7 +21,8 @@ public interface ActorSystem extends AutoCloseable { - void register(Actor actor); + void register(IActor actor); + void register(IHierarchicalActor actor); ExecutorService createOrGetExecutorService(HighLevelActorConfig config); diff --git a/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java b/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java index c80ef5e..46b60bf 100644 --- a/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java +++ b/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java @@ -2,6 +2,7 @@ import io.appform.memq.actor.Actor; +import io.appform.memq.actor.IActor; import io.appform.memq.actor.Message; import io.appform.memq.actor.MessageMeta; import io.appform.memq.observer.ActorObserver; @@ -16,7 +17,7 @@ public abstract class HighLevelActor, M ex @Getter private final MessageType type; - private final Actor actor; + protected final IActor actor; @SuppressWarnings("unused") protected HighLevelActor( diff --git a/memq-actor/src/main/java/io/appform/memq/HighLevelActorConfig.java b/memq-actor/src/main/java/io/appform/memq/HighLevelActorConfig.java index 496f227..b3dfc4c 100644 --- a/memq-actor/src/main/java/io/appform/memq/HighLevelActorConfig.java +++ b/memq-actor/src/main/java/io/appform/memq/HighLevelActorConfig.java @@ -5,48 +5,48 @@ import io.appform.memq.retry.config.NoRetryConfig; import io.appform.memq.retry.config.RetryConfig; import lombok.*; -import lombok.extern.jackson.Jacksonized; import javax.validation.Valid; import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; -@Value -@Builder -@Jacksonized +@Data +@EqualsAndHashCode +@ToString @AllArgsConstructor @NoArgsConstructor +@Builder public class HighLevelActorConfig { @Min(1) @Max(100) @Builder.Default - int partitions = 1; + private int partitions = 1; @Min(1) @Builder.Default - long maxSizePerPartition = Long.MAX_VALUE; + private long maxSizePerPartition = Long.MAX_VALUE; @Min(1) @Builder.Default - int maxConcurrencyPerPartition = Integer.MAX_VALUE; + private int maxConcurrencyPerPartition = Integer.MAX_VALUE; @Valid @NotNull @Builder.Default - RetryConfig retryConfig = new NoRetryConfig(); + private RetryConfig retryConfig = new NoRetryConfig(); @Valid @NotNull @Builder.Default - ExceptionHandlerConfig exceptionHandlerConfig = new DropConfig(); + private ExceptionHandlerConfig exceptionHandlerConfig = new DropConfig(); @NotNull @Builder.Default - String executorName = "default"; + private String executorName = "default"; @Builder.Default - boolean metricDisabled = false; + private boolean metricDisabled = false; } diff --git a/memq-actor/src/main/java/io/appform/memq/actor/Actor.java b/memq-actor/src/main/java/io/appform/memq/actor/Actor.java index 94faa7b..396ad7b 100644 --- a/memq-actor/src/main/java/io/appform/memq/actor/Actor.java +++ b/memq-actor/src/main/java/io/appform/memq/actor/Actor.java @@ -33,7 +33,7 @@ import java.util.stream.IntStream; @Slf4j -public class Actor implements AutoCloseable { +public class Actor implements IActor { private final String name; private final ExecutorService executorService; @@ -84,12 +84,14 @@ public Actor( this.rootObserver = setupObserver(observers); } + @Override public final boolean isEmpty() { return mailboxes.values() .stream() .allMatch(Mailbox::isEmpty); } + @Override public final long size() { return mailboxes.values() .stream() @@ -97,6 +99,7 @@ public final long size() { .sum(); } + @Override public final long inFlight() { return mailboxes.values() .stream() @@ -104,16 +107,19 @@ public final long inFlight() { .sum(); } + @Override public final boolean isRunning() { return mailboxes.values() .stream() .allMatch(Mailbox::isRunning); } + @Override public final void purge() { mailboxes.values().forEach(Mailbox::purge); } + @Override public final boolean publish(final M message) { return rootObserver.execute(ActorObserverContext.builder() .operation(ActorOperation.PUBLISH) @@ -124,6 +130,7 @@ public final boolean publish(final M message) { .publish(message)); } + @Override public final void start() { mailboxes.values().forEach(Mailbox::start); } diff --git a/memq-actor/src/main/java/io/appform/memq/actor/IActor.java b/memq-actor/src/main/java/io/appform/memq/actor/IActor.java new file mode 100644 index 0000000..ea3de31 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/actor/IActor.java @@ -0,0 +1,15 @@ +package io.appform.memq.actor; + +public interface IActor extends AutoCloseable { + + void start(); + void close(); + + boolean isEmpty(); + long size(); + long inFlight(); + boolean isRunning(); + void purge(); + + boolean publish(final M message); +} \ No newline at end of file diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchialHighLevelActorConfig.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchialHighLevelActorConfig.java new file mode 100644 index 0000000..551563c --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchialHighLevelActorConfig.java @@ -0,0 +1,29 @@ +package io.appform.memq.hierarchical; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonUnwrapped; +import io.appform.memq.HighLevelActorConfig; +import io.appform.memq.hierarchical.tree.HierarchicalDataStoreTreeNode; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +@EqualsAndHashCode +@ToString +@NoArgsConstructor +public class HierarchialHighLevelActorConfig extends HighLevelActorConfig { + + /** + *

This param will reused all Parent Level ActorConfig while creating all child actors, + * if marked as false, every children will need tp provide Actor config specific to child

+ * + */ + private boolean useParentConfigInWorker = true; + + @JsonUnwrapped + private HierarchicalDataStoreTreeNode childrenData; + +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java new file mode 100644 index 0000000..af8d128 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java @@ -0,0 +1,188 @@ +package io.appform.memq.hierarchical; + +import io.appform.memq.ActorSystem; +import io.appform.memq.actor.Message; +import io.appform.memq.actor.MessageMeta; +import io.appform.memq.hierarchical.tree.HierarchicalDataStoreSupplierTree; +import io.appform.memq.hierarchical.tree.HierarchicalTreeConfig; +import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey; +import io.appform.memq.hierarchical.tree.key.RoutingKey; +import io.appform.memq.observer.ActorObserver; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.ToIntFunction; + +@Slf4j +public class HierarchicalActor, M extends Message> implements IHierarchicalActor { + + public static final RoutingKey EMPTY_ROUTING_KEY = RoutingKey.builder().build(); + + private final HierarchicalTreeConfig hierarchicalTreeConfig; + private final MessageType messageType; + private final ActorSystem actorSystem; + private final ToIntFunction partitioner; + private final List observers; + private final BiFunction messageHandler; + private final BiConsumer sidelineHandler; + + @Getter + private HierarchicalDataStoreSupplierTree< + HierarchicalOperationWorkerConfig, + HierarchialHighLevelActorConfig, + MessageType, + HierarchicalOperationWorker> worker; + + + public HierarchicalActor(MessageType messageType, + HierarchialHighLevelActorConfig hierarchicalActorConfig, + ActorSystem actorSystem, + BiFunction messageHandler, + BiConsumer sidelineHandler, + ToIntFunction partitioner, + List observers) { + this.messageType = messageType; + this.hierarchicalTreeConfig = new HierarchicalTreeConfig<>(hierarchicalActorConfig, hierarchicalActorConfig.getChildrenData()); + this.actorSystem = actorSystem; + this.messageHandler = messageHandler; + this.sidelineHandler = sidelineHandler; + this.partitioner = partitioner; + this.observers = observers; + } + + @Override + public void start() { + log.info("Starting all workers"); + this.initializeRouter(); + } + + @Override + public void close() { + log.info("Closing all workers"); + worker.traverse(hierarchicalOperationWorker -> { + log.info("Closing worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + hierarchicalOperationWorker.close(); + }); + } + + @Override + public void purge() { + log.info("Purging all workers"); + worker.traverse(hierarchicalOperationWorker -> { + log.info("Purging worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + hierarchicalOperationWorker.purge(); + }); + } + + @Override + public boolean publish(final M message) { + return publishActor(EMPTY_ROUTING_KEY).publish(message); + } + + @Override + public long size() { + log.info("Size of all workers"); + val atomicLong = new AtomicLong(); + worker.traverse(hierarchicalOperationWorker -> { + log.info("Size of worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + atomicLong.getAndAdd(hierarchicalOperationWorker.size()); + }); + return atomicLong.get(); + } + + @Override + public long inFlight() { + log.info("inFlight Size of all workers"); + val atomicLong = new AtomicLong(); + worker.traverse(hierarchicalOperationWorker -> { + log.info("inFlight Size of worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + atomicLong.getAndAdd(hierarchicalOperationWorker.inFlight()); + }); + return atomicLong.get(); + } + + @Override + public boolean isEmpty() { + log.info("isEmpty all workers"); + val atomicBoolean = new AtomicBoolean(); + worker.traverse(hierarchicalOperationWorker -> { + log.info("isEmpty worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + atomicBoolean.set(atomicBoolean.get() && hierarchicalOperationWorker.isEmpty()); + }); + return atomicBoolean.get(); + } + + @Override + public boolean isRunning() { + log.info("isRunning all workers"); + val atomicBoolean = new AtomicBoolean(); + worker.traverse(hierarchicalOperationWorker -> { + log.info("isRunning worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + atomicBoolean.set(atomicBoolean.get() && hierarchicalOperationWorker.isRunning()); + }); + return atomicBoolean.get(); + } + + @Override + public void purge(final HierarchicalRoutingKey routingKey) { + publishActor(routingKey).purge(); + } + + @Override + public boolean publish(final HierarchicalRoutingKey routingKey, + final M message) { + return publishActor(routingKey).publish(message); + } + + @Override + public long size(final HierarchicalRoutingKey routingKey) { + return publishActor(routingKey).size(); + } + + @Override + public long inFlight(final HierarchicalRoutingKey routingKey) { + return publishActor(routingKey).inFlight(); + } + + @Override + public boolean isEmpty(final HierarchicalRoutingKey routingKey) { + return publishActor(routingKey).isEmpty(); + } + + @Override + public boolean isRunning(final HierarchicalRoutingKey routingKey) { + return publishActor(routingKey).isRunning(); + } + + private HierarchicalOperationWorker publishActor(final HierarchicalRoutingKey routingKey) { + return (HierarchicalOperationWorker) this.worker.get(messageType, routingKey); + } + + private void initializeRouter() { + this.worker = new HierarchicalDataStoreSupplierTree<>( + messageType, + hierarchicalTreeConfig, + HierarchicalRouterUtils.actorConfigToWorkerConfigFunc, + (routingKey, messageTypeKey, workerConfig) -> { + log.info("{} -> {}", routingKey.getRoutingKey(), messageTypeKey); + return new HierarchicalOperationWorker<>( + messageType, + workerConfig, + hierarchicalTreeConfig.getDefaultData(), + routingKey, + actorSystem, + messageHandler, + sidelineHandler, + partitioner, + observers); + } + ); + } + +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java new file mode 100644 index 0000000..9bf9258 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java @@ -0,0 +1,113 @@ +package io.appform.memq.hierarchical; + + +import io.appform.memq.ActorSystem; +import io.appform.memq.actor.Message; +import io.appform.memq.actor.MessageMeta; +import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey; +import io.appform.memq.observer.ActorObserver; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.function.ToIntFunction; + +@Slf4j +public abstract class HierarchicalHighLevelActor, M extends Message> { + + + @Getter + private final MessageType type; + @Getter + private final HierarchicalActor actor; + + @SuppressWarnings("unused") + protected HierarchicalHighLevelActor( + MessageType type, + HierarchialHighLevelActorConfig highLevelActorConfig, + ActorSystem actorSystem) { + this(type, highLevelActorConfig, actorSystem, null, List.of()); + } + + protected HierarchicalHighLevelActor( + MessageType type, + HierarchialHighLevelActorConfig highLevelActorConfig, + ActorSystem actorSystem, + ToIntFunction partitioner) { + this(type, highLevelActorConfig, actorSystem, partitioner, List.of()); + } + + protected HierarchicalHighLevelActor( + MessageType type, + HierarchialHighLevelActorConfig highLevelActorConfig, + ActorSystem actorSystem, + List observers) { + this(type, highLevelActorConfig, actorSystem, null, observers); + } + + protected HierarchicalHighLevelActor( + MessageType type, + HierarchialHighLevelActorConfig highLevelActorConfig, + ActorSystem actorSystem, + ToIntFunction partitioner, + List observers) { + this.type = type; + this.actor = new HierarchicalActor<>(type, highLevelActorConfig, actorSystem, this::handle, this::sideline, partitioner, observers); + actorSystem.register(actor); + } + + protected abstract boolean handle(final M message, MessageMeta messageMeta); + + protected void sideline(final M message, MessageMeta messageMeta) { + log.warn("skipping sideline for actor:{} message:{}", type.name(), message); + } + + public final void purge() { + actor.purge(); + } + + public final boolean publish(final M message) { + return actor.publish(message); + } + + public final long size() { + return actor.size(); + } + + public final long inFlight() { + return actor.inFlight(); + } + + public final boolean isEmpty() { + return actor.isEmpty(); + } + + public final boolean isRunning() { + return actor.isRunning(); + } + + public final void purge(final HierarchicalRoutingKey routingKey) { + actor.purge(routingKey); + } + + public final boolean publish(final HierarchicalRoutingKey routingKey, final M message) { + return actor.publish(routingKey, message); + } + + public final long size(final HierarchicalRoutingKey routingKey) { + return actor.size(routingKey); + } + + public final long inFlight(final HierarchicalRoutingKey routingKey) { + return actor.inFlight(routingKey); + } + + public final boolean isEmpty(final HierarchicalRoutingKey routingKey) { + return actor.isEmpty(routingKey); + } + + public final boolean isRunning(final HierarchicalRoutingKey routingKey) { + return actor.isRunning(routingKey); + } + +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorker.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorker.java new file mode 100644 index 0000000..ec9c2a3 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorker.java @@ -0,0 +1,56 @@ +package io.appform.memq.hierarchical; + +import io.appform.memq.ActorSystem; +import io.appform.memq.HighLevelActor; +import io.appform.memq.actor.Message; +import io.appform.memq.actor.MessageMeta; +import io.appform.memq.hierarchical.tree.key.RoutingKey; +import io.appform.memq.observer.ActorObserver; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.ToIntFunction; + +@Getter +@EqualsAndHashCode +public class HierarchicalOperationWorker, M extends Message> + extends HighLevelActor { + + private final RoutingKey routingKey; + private final BiFunction messageHandler; + private final BiConsumer sidelineHandler; + + public HierarchicalOperationWorker(final MessageType messageType, + final HierarchicalOperationWorkerConfig workerConfig, + final HierarchialHighLevelActorConfig hierarchicalActorConfig, + final RoutingKey routingKey, + final ActorSystem actorSystem, + final BiFunction messageHandler, + final BiConsumer sidelineHandler, + final ToIntFunction partitioner, + final List observers) { + super(messageType, + HierarchicalRouterUtils.hierarchicalActorConfig(messageType, routingKey, workerConfig, hierarchicalActorConfig), + actorSystem, partitioner, observers); + this.routingKey = routingKey; + this.messageHandler = messageHandler; + this.sidelineHandler = sidelineHandler; + } + + @Override + protected boolean handle(M message, MessageMeta messageMeta) { + return messageHandler.apply(message, messageMeta); + } + + @Override + protected void sideline(M message, MessageMeta messageMeta) { + sidelineHandler.accept(message, messageMeta); + } + + public final void close() { + actor.close(); + } +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorkerConfig.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorkerConfig.java new file mode 100644 index 0000000..d9830e5 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorkerConfig.java @@ -0,0 +1,52 @@ +package io.appform.memq.hierarchical; + +import com.fasterxml.jackson.annotation.JsonInclude; +import io.appform.memq.exceptionhandler.config.DropConfig; +import io.appform.memq.exceptionhandler.config.ExceptionHandlerConfig; +import io.appform.memq.retry.config.NoRetryConfig; +import io.appform.memq.retry.config.RetryConfig; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; + +import javax.validation.Valid; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +@EqualsAndHashCode +@ToString +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class HierarchicalOperationWorkerConfig { + + @Min(1) + @Max(100) + @Builder.Default + private int partitions = 1; + + @Min(1) + @Builder.Default + private long maxSizePerPartition = Long.MAX_VALUE; + + @Min(1) + @Builder.Default + private int maxConcurrencyPerPartition = Integer.MAX_VALUE; + + @Valid + @NotNull + @Builder.Default + private RetryConfig retryConfig = new NoRetryConfig(); + + @Valid + @NotNull + @Builder.Default + private ExceptionHandlerConfig exceptionHandlerConfig = new DropConfig(); + +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterUtils.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterUtils.java new file mode 100644 index 0000000..cf6b381 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterUtils.java @@ -0,0 +1,68 @@ +package io.appform.memq.hierarchical; + +import io.appform.memq.HighLevelActorConfig; +import io.appform.memq.hierarchical.tree.key.RoutingKey; +import lombok.experimental.UtilityClass; +import lombok.val; +import org.apache.commons.lang3.StringUtils; + +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@UtilityClass +public class HierarchicalRouterUtils { + + private static final String EXECUTORS = "executors"; + private static final BiFunction, String, String> beautifierFunction = (stream, delimiter) -> stream + .filter(e -> !StringUtils.isEmpty(e)) + .collect(Collectors.joining(delimiter)); + + + static final Function actorConfigToWorkerConfigFunc = + actorConfig -> HierarchicalOperationWorkerConfig.builder() + .partitions(actorConfig.getPartitions()) + .maxSizePerPartition(actorConfig.getMaxSizePerPartition()) + .maxConcurrencyPerPartition(actorConfig.getMaxConcurrencyPerPartition()) + .retryConfig(actorConfig.getRetryConfig()) + .exceptionHandlerConfig(actorConfig.getExceptionHandlerConfig()) + .build(); + + + static > HighLevelActorConfig hierarchicalActorConfig( + MessageType messageType, + RoutingKey routingKeyData, + HierarchicalOperationWorkerConfig workerConfig, + HierarchialHighLevelActorConfig mainActorConfig) { + val useParentConfigInWorker = mainActorConfig.isUseParentConfigInWorker(); + return HighLevelActorConfig.builder() + // Custom fields + .executorName(executorName(mainActorConfig.getExecutorName(), messageType, routingKeyData)) + + .partitions(useParentConfigInWorker ? mainActorConfig.getPartitions() : workerConfig.getPartitions()) + .maxSizePerPartition(useParentConfigInWorker ? mainActorConfig.getMaxSizePerPartition() : workerConfig.getMaxSizePerPartition()) + .maxConcurrencyPerPartition(useParentConfigInWorker ? mainActorConfig.getMaxConcurrencyPerPartition() : workerConfig.getMaxConcurrencyPerPartition()) + .retryConfig(useParentConfigInWorker ? mainActorConfig.getRetryConfig() : workerConfig.getRetryConfig()) + .exceptionHandlerConfig(useParentConfigInWorker ? mainActorConfig.getExceptionHandlerConfig() : workerConfig.getExceptionHandlerConfig()) + .metricDisabled(mainActorConfig.isMetricDisabled()) + .build(); + } + + private static > String executorName(final String parentExchangeName, + final MessageType messageType, + final RoutingKey routingKeyData) { + val routingKey = routingKeyData.getRoutingKey(); + + if (!StringUtils.isEmpty(parentExchangeName)) { + // For backward compatibility + if(routingKey.isEmpty()) { + return parentExchangeName; + } + + return beautifierFunction.apply(Stream.of(parentExchangeName, String.join(".", routingKey)), "."); + } + + return beautifierFunction.apply(Stream.of(EXECUTORS, String.join(".", routingKey), messageType.name()), "."); + } +} \ No newline at end of file diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java new file mode 100644 index 0000000..4dc79cb --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java @@ -0,0 +1,17 @@ +package io.appform.memq.hierarchical; + +import io.appform.memq.actor.IActor; +import io.appform.memq.actor.Message; +import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey; + +public interface IHierarchicalActor extends IActor { + + boolean isEmpty(final HierarchicalRoutingKey routingKey); + long size(final HierarchicalRoutingKey routingKey); + long inFlight(final HierarchicalRoutingKey routingKey); + boolean isRunning(final HierarchicalRoutingKey routingKey); + + void purge(final HierarchicalRoutingKey routingKey); + + boolean publish(final HierarchicalRoutingKey routingKey, final M message); +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreSupplierTree.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreSupplierTree.java new file mode 100644 index 0000000..0298ac9 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreSupplierTree.java @@ -0,0 +1,63 @@ +package io.appform.memq.hierarchical.tree; + +import com.google.common.collect.Lists; +import io.appform.memq.hierarchical.tree.key.RoutingKey; +import lombok.val; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +@SuppressWarnings("java:S119") +public class HierarchicalDataStoreSupplierTree extends HierarchicalDataStoreTree { + + private static final Function, RoutingKey> routingKeyGenerator = (list) -> RoutingKey.builder() + .list(list) + .build(); + + public HierarchicalDataStoreSupplierTree(final NODE_KEY_TYPE key, + final HierarchicalTreeConfig treeConfig, + final Function rootNodeConverterSupplier, + final TriConsumerSupplier supplier) { + super(supplier.get( + routingKeyGenerator.apply(List.of()), + key, + rootNodeConverterSupplier.apply(treeConfig.getDefaultData()) + )); + buildTree(key, treeConfig.getChildrenData(), supplier); + } + + private void buildTree(final NODE_KEY_TYPE key, + final HierarchicalDataStoreTreeNode childrenList, + final TriConsumerSupplier supplier) { + val tokenList = Lists.newArrayList(); + buildTreeHelper(key, childrenList, tokenList, supplier); + } + + private void buildTreeHelper(final NODE_KEY_TYPE key, + final HierarchicalDataStoreTreeNode rootChildrenData, + final List tokenList, + final TriConsumerSupplier supplier) { + val childrenList = rootChildrenData.getChildren(); + if (childrenList.isEmpty()) { + add(routingKeyGenerator.apply(tokenList), key, null); + return; + } + + for (String childrenToken : childrenList.keySet()) { + val currentChildrenData = childrenList.get(childrenToken); + + tokenList.add(childrenToken); + + val routingKey = routingKeyGenerator.apply(tokenList.stream().map(String::valueOf).toList()); + val currentChildrenDefaultData = Objects.nonNull(currentChildrenData.getNodeData()) ? + currentChildrenData.getNodeData() : rootChildrenData.getNodeData(); + + add(routingKey, key, supplier.get(routingKey, key, currentChildrenDefaultData)); + buildTreeHelper(key, currentChildrenData, tokenList, supplier); + + tokenList.remove(childrenToken); + } + } + +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTree.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTree.java new file mode 100644 index 0000000..beaaeb4 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTree.java @@ -0,0 +1,65 @@ +package io.appform.memq.hierarchical.tree; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonUnwrapped; +import com.google.common.collect.Maps; +import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +@Slf4j +@ToString +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@SuppressWarnings("java:S119") +public class HierarchicalDataStoreTree { + + private final NODE_TYPE defaultData; + @JsonUnwrapped + private final Map> rootNodes = Maps.newConcurrentMap(); + + public HierarchicalDataStoreTree() { + this.defaultData = null; + } + + public HierarchicalDataStoreTree(NODE_TYPE defaultData) { + this.defaultData = defaultData; + } + + public void add(final HierarchicalRoutingKey routingKey, final NODE_KEY_TYPE key, final NODE_TYPE data) { + rootNodes.computeIfAbsent(key, t -> new HierarchicalDataStoreTreeNode<>(0, String.valueOf(key), defaultData)); + if (Objects.isNull(data)) { + return; + } + rootNodes.get(key) + .add(routingKey, data); + } + + public void traverse(final Consumer consumer) { + rootNodes.forEach((NODEKEYTYPE, vHierarchicalStoreNode) -> { + if (vHierarchicalStoreNode != null) { + vHierarchicalStoreNode.traverse(consumer); + } + }); + } + + public NODE_TYPE get(final NODE_KEY_TYPE key, final HierarchicalRoutingKey routingKey) { + if (!rootNodes.containsKey(key)) { + log.warn("Key {} not found in {} keys {}. Using default {}", key, rootNodes.keySet(), defaultData); + return defaultData; + } + + val routingKeyToken = routingKey.getRoutingKey(); + if (routingKeyToken== null || routingKeyToken.isEmpty()) { + log.warn("keys are empty {}. Using default {}", key, rootNodes.keySet(), defaultData); + return defaultData; + } + + return rootNodes.get(key) + .find(routingKey); + } +} \ No newline at end of file diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTreeNode.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTreeNode.java new file mode 100644 index 0000000..5d2c6bb --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTreeNode.java @@ -0,0 +1,107 @@ +package io.appform.memq.hierarchical.tree; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.google.common.collect.Maps; +import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey; +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +@Slf4j +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@Data +public class HierarchicalDataStoreTreeNode { + + @JsonIgnore + private final int depth; + @JsonIgnore + private final K token; + + private V nodeData; + private Map> children = Maps.newConcurrentMap(); + + + public HierarchicalDataStoreTreeNode() { + this.depth = 0; + this.token = null; + this.nodeData = null; + } + + public HierarchicalDataStoreTreeNode(K token) { + this.depth = 0; + this.token = token; + this.nodeData = null; + } + + + @Builder + public HierarchicalDataStoreTreeNode(final int depth, final K token, final V nodeData) { + this.depth = depth; + this.token = token; + this.nodeData = nodeData; + } + + void traverse(final Consumer consumer) { + if (nodeData != null) { + consumer.accept(nodeData); + } + children.forEach((k, kvHierarchicalStoreNode) -> { + if (kvHierarchicalStoreNode != null) { + kvHierarchicalStoreNode.traverse(consumer); + } + }); + } + + void addChild(final List tokens, final V defaultData) { + final K key = tokens.get(depth); + + log.debug("depth: {} name: {} key: {} tokens: {} defaultData: {}", depth, token, key, tokens, defaultData); + + if (tokens.size() > depth + 1) { + children.computeIfAbsent(key, k -> new HierarchicalDataStoreTreeNode<>(depth + 1, tokens.get(depth), null)); + children.get(key).addChild(tokens, defaultData); + } else { + if (!children.containsKey(key)) { + children.put(key, new HierarchicalDataStoreTreeNode(depth + 1, tokens.get(depth), defaultData)); + } else { + if (children.get(key) + .getNodeData() == null) { + children.get(key) + .setNodeData(defaultData); + } else { + log.error("Request to overwrite at {} existing defaultData: {} new defaultData {}", tokens, children.get(key) + .getNodeData(), defaultData); + } + } + } + } + + V findNode(final List tokens) { + if (tokens.size() == depth) { + return nodeData; + } + + if (!children.containsKey(tokens.get(depth))) { + return nodeData; + } + + V load = children.get(tokens.get(depth)) + .findNode(tokens); + return load == null + ? nodeData + : load; + } + + public void add(final HierarchicalRoutingKey routingKey, final V payload) { + addChild(routingKey.getRoutingKey(), payload); + } + + public V find(final HierarchicalRoutingKey routingKey) { + return findNode(routingKey.getRoutingKey()); + } +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalTreeConfig.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalTreeConfig.java new file mode 100644 index 0000000..7519d61 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalTreeConfig.java @@ -0,0 +1,18 @@ +package io.appform.memq.hierarchical.tree; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonUnwrapped; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuppressWarnings("java:S119") +public class HierarchicalTreeConfig { + private ROOT_TYPE defaultData; + @JsonUnwrapped + private HierarchicalDataStoreTreeNode childrenData; +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java new file mode 100644 index 0000000..f2ec183 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java @@ -0,0 +1,6 @@ +package io.appform.memq.hierarchical.tree; + +@FunctionalInterface +public interface TriConsumerSupplier { + public S get(R routingKey, K key, V value); +} \ No newline at end of file diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/HierarchicalRoutingKey.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/HierarchicalRoutingKey.java new file mode 100644 index 0000000..ee6248d --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/HierarchicalRoutingKey.java @@ -0,0 +1,7 @@ +package io.appform.memq.hierarchical.tree.key; + +import java.util.List; + +public interface HierarchicalRoutingKey { + List getRoutingKey(); +} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/RoutingKey.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/RoutingKey.java new file mode 100644 index 0000000..221bd08 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/RoutingKey.java @@ -0,0 +1,20 @@ +package io.appform.memq.hierarchical.tree.key; + + +import lombok.Builder; + +import java.util.List; + +public class RoutingKey implements HierarchicalRoutingKey { + private final List list; + + @Builder + public RoutingKey(final List list) { + this.list = list; + } + + @Override + public List getRoutingKey() { + return list; + } +} \ No newline at end of file diff --git a/memq-actor/src/main/java/io/appform/memq/observer/ActorObserver.java b/memq-actor/src/main/java/io/appform/memq/observer/ActorObserver.java index 9744e63..b63fd50 100644 --- a/memq-actor/src/main/java/io/appform/memq/observer/ActorObserver.java +++ b/memq-actor/src/main/java/io/appform/memq/observer/ActorObserver.java @@ -1,6 +1,6 @@ package io.appform.memq.observer; -import io.appform.memq.actor.Actor; +import io.appform.memq.actor.IActor; import io.appform.memq.actor.Message; import lombok.Getter; @@ -15,7 +15,7 @@ protected ActorObserver(ActorObserver next) { this.next = next; } - public abstract void initialize(Actor actor); + public abstract void initialize(IActor actor); public abstract boolean execute( final ActorObserverContext context, diff --git a/memq-actor/src/main/java/io/appform/memq/observer/TerminalActorObserver.java b/memq-actor/src/main/java/io/appform/memq/observer/TerminalActorObserver.java index e991a53..6be0b6f 100644 --- a/memq-actor/src/main/java/io/appform/memq/observer/TerminalActorObserver.java +++ b/memq-actor/src/main/java/io/appform/memq/observer/TerminalActorObserver.java @@ -1,7 +1,7 @@ package io.appform.memq.observer; -import io.appform.memq.actor.Actor; +import io.appform.memq.actor.IActor; import io.appform.memq.actor.Message; import java.util.function.BooleanSupplier; @@ -13,7 +13,7 @@ public TerminalActorObserver() { } @Override - public void initialize(Actor actor) { + public void initialize(IActor actor) { } @Override diff --git a/memq-actor/src/main/java/io/appform/memq/stats/ActorMetricObserver.java b/memq-actor/src/main/java/io/appform/memq/stats/ActorMetricObserver.java index 21ad7fd..3b8c498 100644 --- a/memq-actor/src/main/java/io/appform/memq/stats/ActorMetricObserver.java +++ b/memq-actor/src/main/java/io/appform/memq/stats/ActorMetricObserver.java @@ -2,7 +2,7 @@ import com.codahale.metrics.*; -import io.appform.memq.actor.Actor; +import io.appform.memq.actor.IActor; import io.appform.memq.actor.Message; import io.appform.memq.observer.ActorObserver; import io.appform.memq.observer.ActorObserverContext; @@ -40,7 +40,7 @@ private static String normalizeString(final String name) { } @Override - public void initialize(Actor actor) { + public void initialize(IActor actor) { this.metricRegistry.gauge(MetricRegistry.name(getMetricPrefix(actorName), "size"), (MetricRegistry.MetricSupplier>) () -> new CachedGauge<>(5, TimeUnit.SECONDS) { diff --git a/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java b/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java index 154029f..f272f4d 100644 --- a/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java +++ b/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java @@ -4,12 +4,13 @@ import com.google.common.collect.Lists; import io.appform.memq.ActorSystem; import io.appform.memq.HighLevelActor; -import io.appform.memq.actor.Actor; +import io.appform.memq.actor.IActor; import io.appform.memq.HighLevelActorConfig; import io.appform.memq.exceptionhandler.config.ExceptionHandlerConfig; import io.appform.memq.exceptionhandler.config.SidelineConfig; import io.appform.memq.helper.message.TestIntMessage; import io.appform.memq.actor.MessageMeta; +import io.appform.memq.hierarchical.IHierarchicalActor; import io.appform.memq.observer.ActorObserver; import io.appform.memq.retry.RetryStrategy; import io.appform.memq.retry.RetryStrategyFactory; @@ -38,10 +39,16 @@ public static ActorSystem actorSystem(ExecutorService tp) { val metricRegistry = new MetricRegistry(); return new ActorSystem() { private final RetryStrategyFactory retryStrategyFactory = new RetryStrategyFactory(); - private final List> registeredActors = Lists.newArrayList(); + private final List> registeredActors = Lists.newArrayList(); @Override - public void register(Actor actor) { + public void register(IActor actor) { + registeredActors.add(actor); + actor.start(); + } + + @Override + public void register(IHierarchicalActor actor) { registeredActors.add(actor); actor.start(); } @@ -68,12 +75,12 @@ public List registeredObservers() { @Override public boolean isRunning() { - return !registeredActors.isEmpty() && registeredActors.stream().allMatch(Actor::isRunning); + return !registeredActors.isEmpty() && registeredActors.stream().allMatch(IActor::isRunning); } @Override public void close() { - registeredActors.forEach(Actor::close); + registeredActors.forEach(IActor::close); } }; } diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java new file mode 100644 index 0000000..8e48cf8 --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java @@ -0,0 +1,16 @@ +package io.appform.memq.hierarchical; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +@NoArgsConstructor +@AllArgsConstructor +public class FlowHierarchicalMemqActorConfig> { + private Map workers; +} diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java new file mode 100644 index 0000000..521eb9a --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java @@ -0,0 +1,119 @@ +package io.appform.memq.hierarchical; + +import com.fasterxml.jackson.core.type.TypeReference; +import io.appform.memq.ActorSystem; +import io.appform.memq.MemQTestExtension; +import io.appform.memq.hierarchical.actor.FlowTypeHierarchicalActorBuilder; +import io.appform.memq.hierarchical.data.ActionMessage; +import io.appform.memq.hierarchical.data.C2CDataActionMessage; +import io.appform.memq.hierarchical.data.C2MDataActionMessage; +import io.appform.memq.hierarchical.data.FlowType; +import io.appform.memq.hierarchical.tree.key.RoutingKey; +import io.appform.memq.util.YamlReader; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Slf4j +@ExtendWith(MemQTestExtension.class) +public class HierarchicalHighLevelActorTest { + + private final static FlowHierarchicalMemqActorConfig RMQ_CONFIG = YamlReader.loadConfig("rmqHierarchicalMemq.yaml", new TypeReference<>() { + }); + private Map> actorActors; + + enum HierarchicalHighLevelActorType { + C2M_AUTH_FLOW, + C2C_AUTH_FLOW; + } + + static final int THREAD_POOL_SIZE = 10; + + @SneakyThrows + public void createActors(ActorSystem actorSystem) { + actorActors = RMQ_CONFIG.getWorkers() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getKey().accept(new FlowTypeHierarchicalActorBuilder(e.getValue(), actorSystem)))); + } + + @Test + @SneakyThrows + void testSuccessSinglePartition(ActorSystem actorSystem) { + createActors(actorSystem); + val messages = Map.of( + RoutingKey.builder().list(List.of("")).build(), + C2MDataActionMessage.builder() + .data("C2M") + .build(), + + RoutingKey.builder().list(List.of("REGULAR", "JAR")).build(), + C2MDataActionMessage.builder() + .data("C2M-REGULAR-JAR-SOME") + .build(), + + RoutingKey.builder().list(List.of("REGULAR")).build(), + C2CDataActionMessage.builder() + .data("C2C-REGULAR") + .build(), + + RoutingKey.builder().list(List.of("C2C_AUTH_FLOW")).build(), + C2CDataActionMessage.builder() + .data("C2C") + .build(), + + RoutingKey.builder().list(List.of("FULL_AUTH", "JAR")).build(), + C2MDataActionMessage.builder() + .data("C2M-FULL_AUTH-JAR-SOME") + .build() + ); + + messages.forEach((routingKey, message) -> { + val flowType = message.getType(); + + if (actorActors.containsKey(flowType)) { + val router = actorActors.get(flowType); + Assertions.assertNotNull(router); + + val flowLevelPrefix = Arrays.asList(RMQ_CONFIG.getWorkers().get(flowType).getExecutorName().split("\\.")); + System.out.println("flowLevelPrefix" + flowLevelPrefix); + + val worker = router.getActor().getWorker().get(flowType, routingKey); + Assertions.assertNotNull(worker); + + val routingKeyWorker = worker.getRoutingKey(); + System.out.println("routingKeyWorker : " + routingKeyWorker.getRoutingKey()); +// Assertions.assertNotNull(publisherQueueName); +// val publisherQueueNameTokens = new LinkedHashSet<>(Arrays.stream(worker +// .getActorImpl() +// .getPublishActor() +// .queueName() +// .split("\\.")) +// .filter(e -> !e.isBlank() && !flowLevelPrefix.contains(e)) +// .toList()); +// +// val expectedElementsInQueueName = new LinkedHashSet<>(routingKey.getRoutingKey().stream().filter(e -> !e.isBlank()).toList()); +// expectedElementsInQueueName.add(flowType.name()); +// +// publisherQueueNameTokens.forEach(ele -> Assertions.assertTrue(expectedElementsInQueueName.contains(ele))); + message.setExecutorName(String.join("-", routingKeyWorker.getRoutingKey())); + try { + router.publish(routingKey, message); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + } + + +} diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2CDataActionMessageHierarchicalActor.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2CDataActionMessageHierarchicalActor.java new file mode 100644 index 0000000..d2a060f --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2CDataActionMessageHierarchicalActor.java @@ -0,0 +1,24 @@ +package io.appform.memq.hierarchical.actor; + +import io.appform.memq.ActorSystem; +import io.appform.memq.actor.MessageMeta; +import io.appform.memq.hierarchical.HierarchialHighLevelActorConfig; +import io.appform.memq.hierarchical.HierarchicalHighLevelActor; +import io.appform.memq.hierarchical.data.ActionMessage; +import io.appform.memq.hierarchical.data.FlowType; + + +public class C2CDataActionMessageHierarchicalActor extends HierarchicalHighLevelActor { + + + public C2CDataActionMessageHierarchicalActor(final HierarchialHighLevelActorConfig hierarchicalTreeConfig, + final ActorSystem actorSystem) { + super(FlowType.C2C_AUTH_FLOW, hierarchicalTreeConfig, actorSystem); + } + + @Override + protected boolean handle(ActionMessage actionMessage, MessageMeta messageMetadata) { + System.out.println("C2C : " + actionMessage); + return true; + } +} \ No newline at end of file diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2MDataActionMessageHierarchicalActor.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2MDataActionMessageHierarchicalActor.java new file mode 100644 index 0000000..34ed4e8 --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2MDataActionMessageHierarchicalActor.java @@ -0,0 +1,25 @@ +package io.appform.memq.hierarchical.actor; + + +import io.appform.memq.ActorSystem; +import io.appform.memq.actor.MessageMeta; +import io.appform.memq.hierarchical.HierarchialHighLevelActorConfig; +import io.appform.memq.hierarchical.HierarchicalHighLevelActor; +import io.appform.memq.hierarchical.data.ActionMessage; +import io.appform.memq.hierarchical.data.FlowType; + +public class C2MDataActionMessageHierarchicalActor extends HierarchicalHighLevelActor { + + + public C2MDataActionMessageHierarchicalActor(final HierarchialHighLevelActorConfig hierarchicalTreeConfig, + final ActorSystem actorSystem) { + super(FlowType.C2M_AUTH_FLOW, hierarchicalTreeConfig, actorSystem); + } + + @Override + protected boolean handle(ActionMessage actionMessage, MessageMeta messageMeta) { + System.out.println("C2M : " + actionMessage); + return true; + } + +} \ No newline at end of file diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java new file mode 100644 index 0000000..60857cf --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java @@ -0,0 +1,30 @@ +package io.appform.memq.hierarchical.actor; + + +import io.appform.memq.ActorSystem; +import io.appform.memq.hierarchical.HierarchialHighLevelActorConfig; +import io.appform.memq.hierarchical.HierarchicalHighLevelActor; +import io.appform.memq.hierarchical.data.ActionMessage; +import io.appform.memq.hierarchical.data.FlowType; + +public class FlowTypeHierarchicalActorBuilder implements FlowType.FlowTypeVisitor> { + + private final HierarchialHighLevelActorConfig hierarchicalTreeConfig; + private final ActorSystem actorSystem; + + public FlowTypeHierarchicalActorBuilder(final HierarchialHighLevelActorConfig hierarchicalTreeConfig, + final ActorSystem actorSystem) { + this.hierarchicalTreeConfig = hierarchicalTreeConfig; + this.actorSystem = actorSystem; + } + + @Override + public HierarchicalHighLevelActor visitC2M() { + return new C2MDataActionMessageHierarchicalActor(hierarchicalTreeConfig, actorSystem); + } + + @Override + public HierarchicalHighLevelActor visitC2C() { + return new C2CDataActionMessageHierarchicalActor(hierarchicalTreeConfig, actorSystem); + } +} diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java new file mode 100644 index 0000000..9b14a44 --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java @@ -0,0 +1,33 @@ +package io.appform.memq.hierarchical.data; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.appform.memq.actor.Message; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Setter; +import lombok.ToString; + +import javax.validation.constraints.NotNull; + +@Data +@EqualsAndHashCode +@ToString +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(name = FlowType.C2M_AUTH_FLOW_TEXT, value = C2MDataActionMessage.class), + @JsonSubTypes.Type(name = FlowType.C2C_AUTH_FLOW_TEXT, value = C2CDataActionMessage.class) +}) +public abstract class ActionMessage implements Message { + + @NotNull + private final FlowType type; + + @Setter + private String executorName; + + protected ActionMessage(FlowType type) { + this.type = type; + } + +} \ No newline at end of file diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2CDataActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2CDataActionMessage.java new file mode 100644 index 0000000..1e5b31e --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2CDataActionMessage.java @@ -0,0 +1,28 @@ +package io.appform.memq.hierarchical.data; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class C2CDataActionMessage extends ActionMessage { + private String data; + + public C2CDataActionMessage() { + super(FlowType.C2C_AUTH_FLOW); + } + + @Builder + public C2CDataActionMessage(String data) { + this(); + this.data = data; + } + + @Override + public String id() { + return data; + } +} \ No newline at end of file diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2MDataActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2MDataActionMessage.java new file mode 100644 index 0000000..87d6844 --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2MDataActionMessage.java @@ -0,0 +1,28 @@ +package io.appform.memq.hierarchical.data; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class C2MDataActionMessage extends ActionMessage { + private String data; + + public C2MDataActionMessage() { + super(FlowType.C2M_AUTH_FLOW); + } + + @Builder + public C2MDataActionMessage(String data) { + this(); + this.data = data; + } + + @Override + public String id() { + return data; + } +} \ No newline at end of file diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java new file mode 100644 index 0000000..2adb690 --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java @@ -0,0 +1,25 @@ +package io.appform.memq.hierarchical.data; + +public enum FlowType { + C2M_AUTH_FLOW { + @Override + public T accept(FlowTypeVisitor visitor) { + return visitor.visitC2M(); + } + }, + C2C_AUTH_FLOW { + @Override + public T accept(FlowTypeVisitor visitor) { + return visitor.visitC2C(); + } + }; + + public static final String C2M_AUTH_FLOW_TEXT = "C2M_AUTH_FLOW"; + public static final String C2C_AUTH_FLOW_TEXT = "C2C_AUTH_FLOW"; + + public abstract T accept(FlowTypeVisitor visitor); + public interface FlowTypeVisitor { + T visitC2M(); + T visitC2C(); + } +} diff --git a/memq-actor/src/test/java/io/appform/memq/util/YamlReader.java b/memq-actor/src/test/java/io/appform/memq/util/YamlReader.java new file mode 100644 index 0000000..1885c63 --- /dev/null +++ b/memq-actor/src/test/java/io/appform/memq/util/YamlReader.java @@ -0,0 +1,48 @@ +package io.appform.memq.util; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.io.Resources; +import lombok.SneakyThrows; +import lombok.experimental.UtilityClass; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +@UtilityClass +public class YamlReader { + + // Config reader + private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); + + @SneakyThrows + public T readYAML(final String data, final TypeReference typeReference) { + return yamlMapper.readValue(data, typeReference); + } + + @SneakyThrows + public T loadConfig(final String filePath, final TypeReference typeReference) { + String data = fixture(filePath); + return readYAML(data, typeReference); + } + + public static String fixture(String filename) { + return fixture(filename, StandardCharsets.UTF_8); + } + + private static String fixture(String filename, Charset charset) { + URL resource = Resources.getResource(filename); + + try { + return Resources.toString(resource, charset).trim(); + } catch (IOException var4) { + IOException e = var4; + throw new IllegalArgumentException(e); + } + } + + +} diff --git a/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml b/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml new file mode 100644 index 0000000..21899f1 --- /dev/null +++ b/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml @@ -0,0 +1,23 @@ +workers: + C2M_AUTH_FLOW: + executorName: prod.mandate.actors.c2m + partitions: 1 + children: + REGULAR: + nodeData: + partitions: 1 + children: + HOTSTAR: + nodeData: + partitions: 2 + JAR: + nodeData: + partitions: 1 + C2C_AUTH_FLOW: + executorName: prod.mandate.actors.c2c + partitions: 1 + children: + REGULAR: + nodeData: + partitions: 1 + diff --git a/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java b/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java index 8a752ac..caea4ed 100644 --- a/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java +++ b/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java @@ -4,8 +4,9 @@ import io.appform.config.ExecutorConfig; import io.appform.config.MemqConfig; import io.appform.memq.ActorSystem; -import io.appform.memq.actor.Actor; +import io.appform.memq.actor.IActor; import io.appform.memq.HighLevelActorConfig; +import io.appform.memq.hierarchical.IHierarchicalActor; import io.appform.memq.observer.ActorObserver; import io.appform.memq.retry.RetryStrategy; import io.appform.memq.retry.RetryStrategyFactory; @@ -28,7 +29,7 @@ public class MemqActorSystem implements ActorSystem, Managed { private final ConcurrentHashMap executors; private final ExecutorServiceProvider executorServiceProvider; private final Map executorConfigMap; - private final List> registeredActors; + private final List> registeredActors; private final RetryStrategyFactory retryStrategyFactory; private final MetricRegistry metricRegistry; private final List actorObservers; @@ -55,12 +56,18 @@ public MemqActorSystem( //System shutdown @Override public void close() { - registeredActors.forEach(Actor::close); + registeredActors.forEach(IActor::close); executors.values().forEach(ExecutorService::shutdown); } @Override - public final void register(Actor actor) { + public final void register(IActor actor) { + registeredActors.add(actor); + actor.start(); //Starting actor during registration + } + + @Override + public final void register(IHierarchicalActor actor) { registeredActors.add(actor); actor.start(); //Starting actor during registration } @@ -89,7 +96,7 @@ public List registeredObservers() { @Override public boolean isRunning() { - return !registeredActors.isEmpty() && registeredActors.stream().allMatch(Actor::isRunning); + return !registeredActors.isEmpty() && registeredActors.stream().allMatch(IActor::isRunning); } @Override From 41776055ef32cee50e49addde222db58b8f7fc49 Mon Sep 17 00:00:00 2001 From: "ashay.patil" Date: Thu, 14 Nov 2024 10:37:03 +0530 Subject: [PATCH 2/5] tes fix --- .../HierarchicalHighLevelActorTest.java | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java index 521eb9a..d712e34 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java @@ -90,20 +90,11 @@ void testSuccessSinglePartition(ActorSystem actorSystem) { Assertions.assertNotNull(worker); val routingKeyWorker = worker.getRoutingKey(); - System.out.println("routingKeyWorker : " + routingKeyWorker.getRoutingKey()); -// Assertions.assertNotNull(publisherQueueName); -// val publisherQueueNameTokens = new LinkedHashSet<>(Arrays.stream(worker -// .getActorImpl() -// .getPublishActor() -// .queueName() -// .split("\\.")) -// .filter(e -> !e.isBlank() && !flowLevelPrefix.contains(e)) -// .toList()); -// -// val expectedElementsInQueueName = new LinkedHashSet<>(routingKey.getRoutingKey().stream().filter(e -> !e.isBlank()).toList()); -// expectedElementsInQueueName.add(flowType.name()); -// -// publisherQueueNameTokens.forEach(ele -> Assertions.assertTrue(expectedElementsInQueueName.contains(ele))); + if(!worker.getRoutingKey().getRoutingKey().isEmpty()) { + val routingKeyWorkerStr = String.join(",",routingKeyWorker.getRoutingKey()); + val routingKeyStr = String.join(",", routingKey.getRoutingKey()); + Assertions.assertEquals(routingKeyWorkerStr, routingKeyStr); + } message.setExecutorName(String.join("-", routingKeyWorker.getRoutingKey())); try { router.publish(routingKey, message); From 4c81190c06a650a4ed7253665242e6ab29bbbb36 Mon Sep 17 00:00:00 2001 From: "ashay.patil" Date: Thu, 14 Nov 2024 10:39:45 +0530 Subject: [PATCH 3/5] fix --- .../java/io/appform/memq/hierarchical/HierarchicalActor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java index af8d128..cc455ae 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java @@ -123,7 +123,7 @@ public boolean isRunning() { log.info("isRunning all workers"); val atomicBoolean = new AtomicBoolean(); worker.traverse(hierarchicalOperationWorker -> { - log.info("isRunning worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + log.info("isRunning worker: {} {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey(), hierarchicalOperationWorker.isRunning()); atomicBoolean.set(atomicBoolean.get() && hierarchicalOperationWorker.isRunning()); }); return atomicBoolean.get(); From ef25d611c8a56601e32d6a6ade95d9d9b8308e59 Mon Sep 17 00:00:00 2001 From: "ashay.patil" Date: Thu, 14 Nov 2024 10:54:30 +0530 Subject: [PATCH 4/5] unnecessary method removed --- .../memq/hierarchical/HierarchicalActor.java | 37 +++---------------- .../HierarchicalHighLevelActor.java | 37 +++++-------------- .../memq/hierarchical/IHierarchicalActor.java | 7 ---- 3 files changed, 15 insertions(+), 66 deletions(-) diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java index cc455ae..c67cd9b 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java @@ -85,6 +85,12 @@ public boolean publish(final M message) { return publishActor(EMPTY_ROUTING_KEY).publish(message); } + @Override + public boolean publish(final HierarchicalRoutingKey routingKey, + final M message) { + return publishActor(routingKey).publish(message); + } + @Override public long size() { log.info("Size of all workers"); @@ -129,37 +135,6 @@ public boolean isRunning() { return atomicBoolean.get(); } - @Override - public void purge(final HierarchicalRoutingKey routingKey) { - publishActor(routingKey).purge(); - } - - @Override - public boolean publish(final HierarchicalRoutingKey routingKey, - final M message) { - return publishActor(routingKey).publish(message); - } - - @Override - public long size(final HierarchicalRoutingKey routingKey) { - return publishActor(routingKey).size(); - } - - @Override - public long inFlight(final HierarchicalRoutingKey routingKey) { - return publishActor(routingKey).inFlight(); - } - - @Override - public boolean isEmpty(final HierarchicalRoutingKey routingKey) { - return publishActor(routingKey).isEmpty(); - } - - @Override - public boolean isRunning(final HierarchicalRoutingKey routingKey) { - return publishActor(routingKey).isRunning(); - } - private HierarchicalOperationWorker publishActor(final HierarchicalRoutingKey routingKey) { return (HierarchicalOperationWorker) this.worker.get(messageType, routingKey); } diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java index 9bf9258..1c55b28 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java @@ -62,14 +62,19 @@ protected void sideline(final M message, MessageMeta messageMeta) { log.warn("skipping sideline for actor:{} message:{}", type.name(), message); } - public final void purge() { - actor.purge(); - } - public final boolean publish(final M message) { return actor.publish(message); } + public final boolean publish(final HierarchicalRoutingKey routingKey, final M message) { + return actor.publish(routingKey, message); + } + + + public final void purge() { + actor.purge(); + } + public final long size() { return actor.size(); } @@ -86,28 +91,4 @@ public final boolean isRunning() { return actor.isRunning(); } - public final void purge(final HierarchicalRoutingKey routingKey) { - actor.purge(routingKey); - } - - public final boolean publish(final HierarchicalRoutingKey routingKey, final M message) { - return actor.publish(routingKey, message); - } - - public final long size(final HierarchicalRoutingKey routingKey) { - return actor.size(routingKey); - } - - public final long inFlight(final HierarchicalRoutingKey routingKey) { - return actor.inFlight(routingKey); - } - - public final boolean isEmpty(final HierarchicalRoutingKey routingKey) { - return actor.isEmpty(routingKey); - } - - public final boolean isRunning(final HierarchicalRoutingKey routingKey) { - return actor.isRunning(routingKey); - } - } diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java index 4dc79cb..6d3f240 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java @@ -6,12 +6,5 @@ public interface IHierarchicalActor extends IActor { - boolean isEmpty(final HierarchicalRoutingKey routingKey); - long size(final HierarchicalRoutingKey routingKey); - long inFlight(final HierarchicalRoutingKey routingKey); - boolean isRunning(final HierarchicalRoutingKey routingKey); - - void purge(final HierarchicalRoutingKey routingKey); - boolean publish(final HierarchicalRoutingKey routingKey, final M message); } From b821b132efb351db1965fb2038a883d53ba21c45 Mon Sep 17 00:00:00 2001 From: Ashay Patil Date: Tue, 30 Dec 2025 11:59:59 +0530 Subject: [PATCH 5/5] refactor --- .../java/io/appform/memq/HighLevelActor.java | 4 ++ .../memq/hierarchical/HierarchicalActor.java | 56 +++++++++++-------- ...nfig.java => HierarchicalActorConfig.java} | 8 +-- .../HierarchicalHighLevelActor.java | 8 +-- .../HierarchicalOperationWorker.java | 56 ------------------- ...ils.java => HierarchicalRouterHelper.java} | 44 +++++++-------- ...g.java => HierarchicalSubActorConfig.java} | 2 +- .../tree/TriConsumerSupplier.java | 2 +- .../FlowHierarchicalMemqActorConfig.java | 2 +- .../HierarchicalHighLevelActorTest.java | 52 ++++++++--------- .../FlowTypeHierarchicalActorBuilder.java | 14 ++--- ...neDataActionMessageHierarchicalActor.java} | 10 ++-- ...woDataActionMessageHierarchicalActor.java} | 10 ++-- .../memq/hierarchical/data/ActionMessage.java | 4 +- .../memq/hierarchical/data/FlowType.java | 16 +++--- ...Message.java => OneDataActionMessage.java} | 8 +-- ...Message.java => TwoDataActionMessage.java} | 9 +-- .../test/resources/rmqHierarchicalMemq.yaml | 16 +++--- 18 files changed, 139 insertions(+), 182 deletions(-) rename memq-actor/src/main/java/io/appform/memq/hierarchical/{HierarchialHighLevelActorConfig.java => HierarchicalActorConfig.java} (60%) delete mode 100644 memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorker.java rename memq-actor/src/main/java/io/appform/memq/hierarchical/{HierarchicalRouterUtils.java => HierarchicalRouterHelper.java} (55%) rename memq-actor/src/main/java/io/appform/memq/hierarchical/{HierarchicalOperationWorkerConfig.java => HierarchicalSubActorConfig.java} (96%) rename memq-actor/src/test/java/io/appform/memq/hierarchical/actor/{C2MDataActionMessageHierarchicalActor.java => OneDataActionMessageHierarchicalActor.java} (60%) rename memq-actor/src/test/java/io/appform/memq/hierarchical/actor/{C2CDataActionMessageHierarchicalActor.java => TwoDataActionMessageHierarchicalActor.java} (60%) rename memq-actor/src/test/java/io/appform/memq/hierarchical/data/{C2MDataActionMessage.java => OneDataActionMessage.java} (67%) rename memq-actor/src/test/java/io/appform/memq/hierarchical/data/{C2CDataActionMessage.java => TwoDataActionMessage.java} (67%) diff --git a/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java b/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java index d09f518..1ef8404 100644 --- a/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java +++ b/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java @@ -130,4 +130,8 @@ public final boolean isRunning() { return actor.isRunning(); } + public final void close() { + actor.close(); + } + } diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java index c67cd9b..2530ff5 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActor.java @@ -1,6 +1,7 @@ package io.appform.memq.hierarchical; import io.appform.memq.ActorSystem; +import io.appform.memq.HighLevelActor; import io.appform.memq.actor.Message; import io.appform.memq.actor.MessageMeta; import io.appform.memq.hierarchical.tree.HierarchicalDataStoreSupplierTree; @@ -24,7 +25,7 @@ public class HierarchicalActor, M extends public static final RoutingKey EMPTY_ROUTING_KEY = RoutingKey.builder().build(); - private final HierarchicalTreeConfig hierarchicalTreeConfig; + private final HierarchicalTreeConfig hierarchicalTreeConfig; private final MessageType messageType; private final ActorSystem actorSystem; private final ToIntFunction partitioner; @@ -34,21 +35,21 @@ public class HierarchicalActor, M extends @Getter private HierarchicalDataStoreSupplierTree< - HierarchicalOperationWorkerConfig, - HierarchialHighLevelActorConfig, + HierarchicalSubActorConfig, + HierarchicalActorConfig, MessageType, - HierarchicalOperationWorker> worker; + HighLevelActor> worker; public HierarchicalActor(MessageType messageType, - HierarchialHighLevelActorConfig hierarchicalActorConfig, + HierarchicalActorConfig hierarchicalActorConfig, ActorSystem actorSystem, BiFunction messageHandler, BiConsumer sidelineHandler, ToIntFunction partitioner, List observers) { this.messageType = messageType; - this.hierarchicalTreeConfig = new HierarchicalTreeConfig<>(hierarchicalActorConfig, hierarchicalActorConfig.getChildrenData()); + this.hierarchicalTreeConfig = new HierarchicalTreeConfig<>(hierarchicalActorConfig, hierarchicalActorConfig.getChildren()); this.actorSystem = actorSystem; this.messageHandler = messageHandler; this.sidelineHandler = sidelineHandler; @@ -66,7 +67,7 @@ public void start() { public void close() { log.info("Closing all workers"); worker.traverse(hierarchicalOperationWorker -> { - log.info("Closing worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + log.info("Closing worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getName()); hierarchicalOperationWorker.close(); }); } @@ -75,7 +76,7 @@ public void close() { public void purge() { log.info("Purging all workers"); worker.traverse(hierarchicalOperationWorker -> { - log.info("Purging worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + log.info("Purging worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getName()); hierarchicalOperationWorker.purge(); }); } @@ -96,7 +97,7 @@ public long size() { log.info("Size of all workers"); val atomicLong = new AtomicLong(); worker.traverse(hierarchicalOperationWorker -> { - log.info("Size of worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + log.info("Size of worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getName()); atomicLong.getAndAdd(hierarchicalOperationWorker.size()); }); return atomicLong.get(); @@ -107,7 +108,7 @@ public long inFlight() { log.info("inFlight Size of all workers"); val atomicLong = new AtomicLong(); worker.traverse(hierarchicalOperationWorker -> { - log.info("inFlight Size of worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + log.info("inFlight Size of worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getName()); atomicLong.getAndAdd(hierarchicalOperationWorker.inFlight()); }); return atomicLong.get(); @@ -118,7 +119,7 @@ public boolean isEmpty() { log.info("isEmpty all workers"); val atomicBoolean = new AtomicBoolean(); worker.traverse(hierarchicalOperationWorker -> { - log.info("isEmpty worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey()); + log.info("isEmpty worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getName()); atomicBoolean.set(atomicBoolean.get() && hierarchicalOperationWorker.isEmpty()); }); return atomicBoolean.get(); @@ -129,33 +130,44 @@ public boolean isRunning() { log.info("isRunning all workers"); val atomicBoolean = new AtomicBoolean(); worker.traverse(hierarchicalOperationWorker -> { - log.info("isRunning worker: {} {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey(), hierarchicalOperationWorker.isRunning()); + log.info("isRunning worker: {} {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getName(), hierarchicalOperationWorker.isRunning()); atomicBoolean.set(atomicBoolean.get() && hierarchicalOperationWorker.isRunning()); }); return atomicBoolean.get(); } - private HierarchicalOperationWorker publishActor(final HierarchicalRoutingKey routingKey) { - return (HierarchicalOperationWorker) this.worker.get(messageType, routingKey); + private HighLevelActor publishActor(final HierarchicalRoutingKey routingKey) { + return (HighLevelActor) this.worker.get(messageType, routingKey); } private void initializeRouter() { + val hierarchicalRouterHelper = new HierarchicalRouterHelper(); this.worker = new HierarchicalDataStoreSupplierTree<>( messageType, hierarchicalTreeConfig, - HierarchicalRouterUtils.actorConfigToWorkerConfigFunc, + hierarchicalRouterHelper.actorConfigToSubActorConfigFunc, (routingKey, messageTypeKey, workerConfig) -> { log.info("{} -> {}", routingKey.getRoutingKey(), messageTypeKey); - return new HierarchicalOperationWorker<>( + val subActorConfig = hierarchicalRouterHelper.hierarchicalActorConfig(messageTypeKey, routingKey, workerConfig, hierarchicalTreeConfig.getDefaultData()); + return new HighLevelActor( + subActorConfig.getExecutorName(), messageType, - workerConfig, - hierarchicalTreeConfig.getDefaultData(), - routingKey, + subActorConfig, actorSystem, - messageHandler, - sidelineHandler, partitioner, - observers); + observers) { + + @Override + protected boolean handle(M message, MessageMeta messageMeta) { + return messageHandler.apply(message, messageMeta); + } + + @Override + protected void sideline(M message, MessageMeta messageMeta) { + sidelineHandler.accept(message, messageMeta); + } + + }; } ); } diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchialHighLevelActorConfig.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActorConfig.java similarity index 60% rename from memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchialHighLevelActorConfig.java rename to memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActorConfig.java index 551563c..8c18cc1 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchialHighLevelActorConfig.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActorConfig.java @@ -14,16 +14,16 @@ @EqualsAndHashCode @ToString @NoArgsConstructor -public class HierarchialHighLevelActorConfig extends HighLevelActorConfig { +public class HierarchicalActorConfig extends HighLevelActorConfig { /** - *

This param will reused all Parent Level ActorConfig while creating all child actors, - * if marked as false, every children will need tp provide Actor config specific to child

+ *

This param will reuse all Parent Level ActorConfig while creating all child actors, + * if marked as false, every child will need tp provide Actor config specific to child

* */ private boolean useParentConfigInWorker = true; @JsonUnwrapped - private HierarchicalDataStoreTreeNode childrenData; + private HierarchicalDataStoreTreeNode children; } diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java index 1c55b28..d256985 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java @@ -24,14 +24,14 @@ public abstract class HierarchicalHighLevelActor partitioner) { this(type, highLevelActorConfig, actorSystem, partitioner, List.of()); @@ -39,7 +39,7 @@ protected HierarchicalHighLevelActor( protected HierarchicalHighLevelActor( MessageType type, - HierarchialHighLevelActorConfig highLevelActorConfig, + HierarchicalActorConfig highLevelActorConfig, ActorSystem actorSystem, List observers) { this(type, highLevelActorConfig, actorSystem, null, observers); @@ -47,7 +47,7 @@ protected HierarchicalHighLevelActor( protected HierarchicalHighLevelActor( MessageType type, - HierarchialHighLevelActorConfig highLevelActorConfig, + HierarchicalActorConfig highLevelActorConfig, ActorSystem actorSystem, ToIntFunction partitioner, List observers) { diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorker.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorker.java deleted file mode 100644 index ec9c2a3..0000000 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorker.java +++ /dev/null @@ -1,56 +0,0 @@ -package io.appform.memq.hierarchical; - -import io.appform.memq.ActorSystem; -import io.appform.memq.HighLevelActor; -import io.appform.memq.actor.Message; -import io.appform.memq.actor.MessageMeta; -import io.appform.memq.hierarchical.tree.key.RoutingKey; -import io.appform.memq.observer.ActorObserver; -import lombok.EqualsAndHashCode; -import lombok.Getter; - -import java.util.List; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.ToIntFunction; - -@Getter -@EqualsAndHashCode -public class HierarchicalOperationWorker, M extends Message> - extends HighLevelActor { - - private final RoutingKey routingKey; - private final BiFunction messageHandler; - private final BiConsumer sidelineHandler; - - public HierarchicalOperationWorker(final MessageType messageType, - final HierarchicalOperationWorkerConfig workerConfig, - final HierarchialHighLevelActorConfig hierarchicalActorConfig, - final RoutingKey routingKey, - final ActorSystem actorSystem, - final BiFunction messageHandler, - final BiConsumer sidelineHandler, - final ToIntFunction partitioner, - final List observers) { - super(messageType, - HierarchicalRouterUtils.hierarchicalActorConfig(messageType, routingKey, workerConfig, hierarchicalActorConfig), - actorSystem, partitioner, observers); - this.routingKey = routingKey; - this.messageHandler = messageHandler; - this.sidelineHandler = sidelineHandler; - } - - @Override - protected boolean handle(M message, MessageMeta messageMeta) { - return messageHandler.apply(message, messageMeta); - } - - @Override - protected void sideline(M message, MessageMeta messageMeta) { - sidelineHandler.accept(message, messageMeta); - } - - public final void close() { - actor.close(); - } -} diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterUtils.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterHelper.java similarity index 55% rename from memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterUtils.java rename to memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterHelper.java index cf6b381..174db86 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterUtils.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterHelper.java @@ -2,7 +2,6 @@ import io.appform.memq.HighLevelActorConfig; import io.appform.memq.hierarchical.tree.key.RoutingKey; -import lombok.experimental.UtilityClass; import lombok.val; import org.apache.commons.lang3.StringUtils; @@ -11,8 +10,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -@UtilityClass -public class HierarchicalRouterUtils { +public class HierarchicalRouterHelper { private static final String EXECUTORS = "executors"; private static final BiFunction, String, String> beautifierFunction = (stream, delimiter) -> stream @@ -20,8 +18,8 @@ public class HierarchicalRouterUtils { .collect(Collectors.joining(delimiter)); - static final Function actorConfigToWorkerConfigFunc = - actorConfig -> HierarchicalOperationWorkerConfig.builder() + public final Function actorConfigToSubActorConfigFunc = + actorConfig -> HierarchicalSubActorConfig.builder() .partitions(actorConfig.getPartitions()) .maxSizePerPartition(actorConfig.getMaxSizePerPartition()) .maxConcurrencyPerPartition(actorConfig.getMaxConcurrencyPerPartition()) @@ -30,37 +28,39 @@ public class HierarchicalRouterUtils { .build(); - static > HighLevelActorConfig hierarchicalActorConfig( - MessageType messageType, - RoutingKey routingKeyData, - HierarchicalOperationWorkerConfig workerConfig, - HierarchialHighLevelActorConfig mainActorConfig) { + public > HighLevelActorConfig hierarchicalActorConfig(final MessageType messageType, + final RoutingKey routingKeyData, + final HierarchicalSubActorConfig subActorConfig, + final HierarchicalActorConfig mainActorConfig) { val useParentConfigInWorker = mainActorConfig.isUseParentConfigInWorker(); return HighLevelActorConfig.builder() // Custom fields .executorName(executorName(mainActorConfig.getExecutorName(), messageType, routingKeyData)) - .partitions(useParentConfigInWorker ? mainActorConfig.getPartitions() : workerConfig.getPartitions()) - .maxSizePerPartition(useParentConfigInWorker ? mainActorConfig.getMaxSizePerPartition() : workerConfig.getMaxSizePerPartition()) - .maxConcurrencyPerPartition(useParentConfigInWorker ? mainActorConfig.getMaxConcurrencyPerPartition() : workerConfig.getMaxConcurrencyPerPartition()) - .retryConfig(useParentConfigInWorker ? mainActorConfig.getRetryConfig() : workerConfig.getRetryConfig()) - .exceptionHandlerConfig(useParentConfigInWorker ? mainActorConfig.getExceptionHandlerConfig() : workerConfig.getExceptionHandlerConfig()) + // Copy from parent if useParentConfigInWorker is set + .partitions(useParentConfigInWorker ? mainActorConfig.getPartitions() : subActorConfig.getPartitions()) + .maxSizePerPartition(useParentConfigInWorker ? mainActorConfig.getMaxSizePerPartition() : subActorConfig.getMaxSizePerPartition()) + .maxConcurrencyPerPartition(useParentConfigInWorker ? mainActorConfig.getMaxConcurrencyPerPartition() : subActorConfig.getMaxConcurrencyPerPartition()) + .retryConfig(useParentConfigInWorker ? mainActorConfig.getRetryConfig() : subActorConfig.getRetryConfig()) + .exceptionHandlerConfig(useParentConfigInWorker ? mainActorConfig.getExceptionHandlerConfig() : subActorConfig.getExceptionHandlerConfig()) + + // Direct from Parent .metricDisabled(mainActorConfig.isMetricDisabled()) .build(); } - private static > String executorName(final String parentExchangeName, - final MessageType messageType, - final RoutingKey routingKeyData) { + private > String executorName(final String parentExecutorName, + final MessageType messageType, + final RoutingKey routingKeyData) { val routingKey = routingKeyData.getRoutingKey(); - if (!StringUtils.isEmpty(parentExchangeName)) { + if (!StringUtils.isEmpty(parentExecutorName)) { // For backward compatibility - if(routingKey.isEmpty()) { - return parentExchangeName; + if (routingKey.isEmpty()) { + return parentExecutorName; } - return beautifierFunction.apply(Stream.of(parentExchangeName, String.join(".", routingKey)), "."); + return beautifierFunction.apply(Stream.of(parentExecutorName, String.join(".", routingKey)), "."); } return beautifierFunction.apply(Stream.of(EXECUTORS, String.join(".", routingKey), messageType.name()), "."); diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorkerConfig.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalSubActorConfig.java similarity index 96% rename from memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorkerConfig.java rename to memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalSubActorConfig.java index d9830e5..6e6290d 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalOperationWorkerConfig.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalSubActorConfig.java @@ -24,7 +24,7 @@ @AllArgsConstructor @NoArgsConstructor @Builder -public class HierarchicalOperationWorkerConfig { +public class HierarchicalSubActorConfig { @Min(1) @Max(100) diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java index f2ec183..87ee0d0 100644 --- a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java +++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java @@ -2,5 +2,5 @@ @FunctionalInterface public interface TriConsumerSupplier { - public S get(R routingKey, K key, V value); + S get(R routingKey, K key, V value); } \ No newline at end of file diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java index 8e48cf8..3d9933f 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java @@ -12,5 +12,5 @@ @NoArgsConstructor @AllArgsConstructor public class FlowHierarchicalMemqActorConfig> { - private Map workers; + private Map workers; } diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java index 3792ba8..5c53390 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java @@ -5,9 +5,9 @@ import io.appform.memq.MemQTestExtension; import io.appform.memq.hierarchical.actor.FlowTypeHierarchicalActorBuilder; import io.appform.memq.hierarchical.data.ActionMessage; -import io.appform.memq.hierarchical.data.C2CDataActionMessage; -import io.appform.memq.hierarchical.data.C2MDataActionMessage; import io.appform.memq.hierarchical.data.FlowType; +import io.appform.memq.hierarchical.data.OneDataActionMessage; +import io.appform.memq.hierarchical.data.TwoDataActionMessage; import io.appform.memq.hierarchical.tree.key.RoutingKey; import io.appform.memq.util.YamlReader; import lombok.SneakyThrows; @@ -44,29 +44,29 @@ void testSuccessSinglePartition(ActorSystem actorSystem) { createActors(actorSystem); val messages = Map.of( RoutingKey.builder().list(List.of("")).build(), - C2MDataActionMessage.builder() - .data("C2M") + OneDataActionMessage.builder() + .data("FLOW_ONE") .build(), - RoutingKey.builder().list(List.of("REGULAR", "JAR")).build(), - C2MDataActionMessage.builder() - .data("C2M-REGULAR-JAR-SOME") + RoutingKey.builder().list(List.of("L1", "L2")).build(), + OneDataActionMessage.builder() + .data("FLOW_ONE-L1-L2-SOME") .build(), - RoutingKey.builder().list(List.of("REGULAR")).build(), - C2CDataActionMessage.builder() - .data("C2C-REGULAR") + RoutingKey.builder().list(List.of("L1")).build(), + TwoDataActionMessage.builder() + .data("FLOW_TWO-L1") .build(), - RoutingKey.builder().list(List.of("C2C_AUTH_FLOW")).build(), - C2CDataActionMessage.builder() - .data("C2C") - .build(), - - RoutingKey.builder().list(List.of("FULL_AUTH", "JAR")).build(), - C2MDataActionMessage.builder() - .data("C2M-FULL_AUTH-JAR-SOME") + RoutingKey.builder().list(List.of("")).build(), + TwoDataActionMessage.builder() + .data("FLOW_TWO") .build() + +// RoutingKey.builder().list(List.of("L2", "L1")).build(), +// OneDataActionMessage.builder() +// .data("FLOW_ONE-L2-L1-SOME") +// .build() ); messages.forEach((routingKey, message) -> { @@ -75,20 +75,16 @@ void testSuccessSinglePartition(ActorSystem actorSystem) { if (actorActors.containsKey(flowType)) { val router = actorActors.get(flowType); Assertions.assertNotNull(router); - - val flowLevelPrefix = Arrays.asList(RMQ_CONFIG.getWorkers().get(flowType).getExecutorName().split("\\.")); - System.out.println("flowLevelPrefix" + flowLevelPrefix); - val worker = router.getActor().getWorker().get(flowType, routingKey); Assertions.assertNotNull(worker); - val routingKeyWorker = worker.getRoutingKey(); - if(!worker.getRoutingKey().getRoutingKey().isEmpty()) { - val routingKeyWorkerStr = String.join(",",routingKeyWorker.getRoutingKey()); - val routingKeyStr = String.join(",", routingKey.getRoutingKey()); - Assertions.assertEquals(routingKeyWorkerStr, routingKeyStr); + val routingKeyWorker = worker.getName(); + if(!routingKeyWorker.isEmpty()) { + val routingKeyStr = String.join(".", routingKey.getRoutingKey()); + System.out.println(routingKeyWorker + " " + routingKeyStr); + Assertions.assertTrue(routingKeyWorker.contains(routingKeyStr)); } - message.setExecutorName(String.join("-", routingKeyWorker.getRoutingKey())); + message.setExecutorName(String.join("-", routingKeyWorker)); try { router.publish(routingKey, message); } catch (Exception e) { diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java index 60857cf..a518285 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java @@ -2,29 +2,29 @@ import io.appform.memq.ActorSystem; -import io.appform.memq.hierarchical.HierarchialHighLevelActorConfig; +import io.appform.memq.hierarchical.HierarchicalActorConfig; import io.appform.memq.hierarchical.HierarchicalHighLevelActor; import io.appform.memq.hierarchical.data.ActionMessage; import io.appform.memq.hierarchical.data.FlowType; public class FlowTypeHierarchicalActorBuilder implements FlowType.FlowTypeVisitor> { - private final HierarchialHighLevelActorConfig hierarchicalTreeConfig; + private final HierarchicalActorConfig hierarchicalTreeConfig; private final ActorSystem actorSystem; - public FlowTypeHierarchicalActorBuilder(final HierarchialHighLevelActorConfig hierarchicalTreeConfig, + public FlowTypeHierarchicalActorBuilder(final HierarchicalActorConfig hierarchicalTreeConfig, final ActorSystem actorSystem) { this.hierarchicalTreeConfig = hierarchicalTreeConfig; this.actorSystem = actorSystem; } @Override - public HierarchicalHighLevelActor visitC2M() { - return new C2MDataActionMessageHierarchicalActor(hierarchicalTreeConfig, actorSystem); + public HierarchicalHighLevelActor visitOne() { + return new OneDataActionMessageHierarchicalActor(hierarchicalTreeConfig, actorSystem); } @Override - public HierarchicalHighLevelActor visitC2C() { - return new C2CDataActionMessageHierarchicalActor(hierarchicalTreeConfig, actorSystem); + public HierarchicalHighLevelActor visitTwo() { + return new TwoDataActionMessageHierarchicalActor(hierarchicalTreeConfig, actorSystem); } } diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2MDataActionMessageHierarchicalActor.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/OneDataActionMessageHierarchicalActor.java similarity index 60% rename from memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2MDataActionMessageHierarchicalActor.java rename to memq-actor/src/test/java/io/appform/memq/hierarchical/actor/OneDataActionMessageHierarchicalActor.java index 34ed4e8..6a13bed 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2MDataActionMessageHierarchicalActor.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/OneDataActionMessageHierarchicalActor.java @@ -3,22 +3,22 @@ import io.appform.memq.ActorSystem; import io.appform.memq.actor.MessageMeta; -import io.appform.memq.hierarchical.HierarchialHighLevelActorConfig; +import io.appform.memq.hierarchical.HierarchicalActorConfig; import io.appform.memq.hierarchical.HierarchicalHighLevelActor; import io.appform.memq.hierarchical.data.ActionMessage; import io.appform.memq.hierarchical.data.FlowType; -public class C2MDataActionMessageHierarchicalActor extends HierarchicalHighLevelActor { +public class OneDataActionMessageHierarchicalActor extends HierarchicalHighLevelActor { - public C2MDataActionMessageHierarchicalActor(final HierarchialHighLevelActorConfig hierarchicalTreeConfig, + public OneDataActionMessageHierarchicalActor(final HierarchicalActorConfig hierarchicalTreeConfig, final ActorSystem actorSystem) { - super(FlowType.C2M_AUTH_FLOW, hierarchicalTreeConfig, actorSystem); + super(FlowType.FLOW_ONE, hierarchicalTreeConfig, actorSystem); } @Override protected boolean handle(ActionMessage actionMessage, MessageMeta messageMeta) { - System.out.println("C2M : " + actionMessage); + System.out.println("FLOW_ONE : " + actionMessage); return true; } diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2CDataActionMessageHierarchicalActor.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/TwoDataActionMessageHierarchicalActor.java similarity index 60% rename from memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2CDataActionMessageHierarchicalActor.java rename to memq-actor/src/test/java/io/appform/memq/hierarchical/actor/TwoDataActionMessageHierarchicalActor.java index d2a060f..e62c11a 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/C2CDataActionMessageHierarchicalActor.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/TwoDataActionMessageHierarchicalActor.java @@ -2,23 +2,23 @@ import io.appform.memq.ActorSystem; import io.appform.memq.actor.MessageMeta; -import io.appform.memq.hierarchical.HierarchialHighLevelActorConfig; +import io.appform.memq.hierarchical.HierarchicalActorConfig; import io.appform.memq.hierarchical.HierarchicalHighLevelActor; import io.appform.memq.hierarchical.data.ActionMessage; import io.appform.memq.hierarchical.data.FlowType; -public class C2CDataActionMessageHierarchicalActor extends HierarchicalHighLevelActor { +public class TwoDataActionMessageHierarchicalActor extends HierarchicalHighLevelActor { - public C2CDataActionMessageHierarchicalActor(final HierarchialHighLevelActorConfig hierarchicalTreeConfig, + public TwoDataActionMessageHierarchicalActor(final HierarchicalActorConfig hierarchicalTreeConfig, final ActorSystem actorSystem) { - super(FlowType.C2C_AUTH_FLOW, hierarchicalTreeConfig, actorSystem); + super(FlowType.FLOW_TWO, hierarchicalTreeConfig, actorSystem); } @Override protected boolean handle(ActionMessage actionMessage, MessageMeta messageMetadata) { - System.out.println("C2C : " + actionMessage); + System.out.println("FLOW_TWO : " + actionMessage); return true; } } \ No newline at end of file diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java index 9b14a44..68ff5c3 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java @@ -15,8 +15,8 @@ @ToString @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type") @JsonSubTypes({ - @JsonSubTypes.Type(name = FlowType.C2M_AUTH_FLOW_TEXT, value = C2MDataActionMessage.class), - @JsonSubTypes.Type(name = FlowType.C2C_AUTH_FLOW_TEXT, value = C2CDataActionMessage.class) + @JsonSubTypes.Type(name = FlowType.FLOW_ONE_TEXT, value = OneDataActionMessage.class), + @JsonSubTypes.Type(name = FlowType.FLOW_TWO_TEXT, value = TwoDataActionMessage.class) }) public abstract class ActionMessage implements Message { diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java index 2adb690..f1a0cba 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java @@ -1,25 +1,25 @@ package io.appform.memq.hierarchical.data; public enum FlowType { - C2M_AUTH_FLOW { + FLOW_ONE { @Override public T accept(FlowTypeVisitor visitor) { - return visitor.visitC2M(); + return visitor.visitOne(); } }, - C2C_AUTH_FLOW { + FLOW_TWO { @Override public T accept(FlowTypeVisitor visitor) { - return visitor.visitC2C(); + return visitor.visitTwo(); } }; - public static final String C2M_AUTH_FLOW_TEXT = "C2M_AUTH_FLOW"; - public static final String C2C_AUTH_FLOW_TEXT = "C2C_AUTH_FLOW"; + public static final String FLOW_ONE_TEXT = "FLOW_ONE"; + public static final String FLOW_TWO_TEXT = "FLOW_TWO"; public abstract T accept(FlowTypeVisitor visitor); public interface FlowTypeVisitor { - T visitC2M(); - T visitC2C(); + T visitOne(); + T visitTwo(); } } diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2MDataActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/OneDataActionMessage.java similarity index 67% rename from memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2MDataActionMessage.java rename to memq-actor/src/test/java/io/appform/memq/hierarchical/data/OneDataActionMessage.java index 87d6844..736fd1e 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2MDataActionMessage.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/OneDataActionMessage.java @@ -8,15 +8,15 @@ @Data @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) -public class C2MDataActionMessage extends ActionMessage { +public class OneDataActionMessage extends ActionMessage { private String data; - public C2MDataActionMessage() { - super(FlowType.C2M_AUTH_FLOW); + public OneDataActionMessage() { + super(FlowType.FLOW_ONE); } @Builder - public C2MDataActionMessage(String data) { + public OneDataActionMessage(String data) { this(); this.data = data; } diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2CDataActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/TwoDataActionMessage.java similarity index 67% rename from memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2CDataActionMessage.java rename to memq-actor/src/test/java/io/appform/memq/hierarchical/data/TwoDataActionMessage.java index 1e5b31e..dcb8f3c 100644 --- a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/C2CDataActionMessage.java +++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/TwoDataActionMessage.java @@ -8,15 +8,15 @@ @Data @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) -public class C2CDataActionMessage extends ActionMessage { +public class TwoDataActionMessage extends ActionMessage { private String data; - public C2CDataActionMessage() { - super(FlowType.C2C_AUTH_FLOW); + public TwoDataActionMessage() { + super(FlowType.FLOW_TWO); } @Builder - public C2CDataActionMessage(String data) { + public TwoDataActionMessage(String data) { this(); this.data = data; } @@ -25,4 +25,5 @@ public C2CDataActionMessage(String data) { public String id() { return data; } + } \ No newline at end of file diff --git a/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml b/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml index 21899f1..a7da3c0 100644 --- a/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml +++ b/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml @@ -1,23 +1,23 @@ workers: - C2M_AUTH_FLOW: - executorName: prod.mandate.actors.c2m + FLOW_ONE: + executorName: sample.flow.actor.one partitions: 1 children: - REGULAR: + L1: nodeData: partitions: 1 children: - HOTSTAR: + L1: nodeData: partitions: 2 - JAR: + L2: nodeData: partitions: 1 - C2C_AUTH_FLOW: - executorName: prod.mandate.actors.c2c + FLOW_TWO: + executorName: sample.flow.actor.two partitions: 1 children: - REGULAR: + L1: nodeData: partitions: 1