(
+ subActorConfig.getExecutorName(),
+ messageType,
+ subActorConfig,
+ actorSystem,
+ partitioner,
+ observers) {
+
+ @Override
+ protected boolean handle(M message, MessageMeta messageMeta) {
+ return messageHandler.apply(message, messageMeta);
+ }
+
+ @Override
+ protected void sideline(M message, MessageMeta messageMeta) {
+ sidelineHandler.accept(message, messageMeta);
+ }
+
+ };
+ }
+ );
+ }
+
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActorConfig.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActorConfig.java
new file mode 100644
index 0000000..8c18cc1
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalActorConfig.java
@@ -0,0 +1,29 @@
+package io.appform.memq.hierarchical;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import io.appform.memq.HighLevelActorConfig;
+import io.appform.memq.hierarchical.tree.HierarchicalDataStoreTreeNode;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@EqualsAndHashCode
+@ToString
+@NoArgsConstructor
+public class HierarchicalActorConfig extends HighLevelActorConfig {
+
+ /**
+ * This param will reuse all Parent Level ActorConfig while creating all child actors,
+ * if marked as false, every child will need tp provide Actor config specific to child
+ *
+ */
+ private boolean useParentConfigInWorker = true;
+
+ @JsonUnwrapped
+ private HierarchicalDataStoreTreeNode children;
+
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java
new file mode 100644
index 0000000..d256985
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalHighLevelActor.java
@@ -0,0 +1,94 @@
+package io.appform.memq.hierarchical;
+
+
+import io.appform.memq.ActorSystem;
+import io.appform.memq.actor.Message;
+import io.appform.memq.actor.MessageMeta;
+import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey;
+import io.appform.memq.observer.ActorObserver;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.function.ToIntFunction;
+
+@Slf4j
+public abstract class HierarchicalHighLevelActor, M extends Message> {
+
+
+ @Getter
+ private final MessageType type;
+ @Getter
+ private final HierarchicalActor actor;
+
+ @SuppressWarnings("unused")
+ protected HierarchicalHighLevelActor(
+ MessageType type,
+ HierarchicalActorConfig highLevelActorConfig,
+ ActorSystem actorSystem) {
+ this(type, highLevelActorConfig, actorSystem, null, List.of());
+ }
+
+ protected HierarchicalHighLevelActor(
+ MessageType type,
+ HierarchicalActorConfig highLevelActorConfig,
+ ActorSystem actorSystem,
+ ToIntFunction partitioner) {
+ this(type, highLevelActorConfig, actorSystem, partitioner, List.of());
+ }
+
+ protected HierarchicalHighLevelActor(
+ MessageType type,
+ HierarchicalActorConfig highLevelActorConfig,
+ ActorSystem actorSystem,
+ List observers) {
+ this(type, highLevelActorConfig, actorSystem, null, observers);
+ }
+
+ protected HierarchicalHighLevelActor(
+ MessageType type,
+ HierarchicalActorConfig highLevelActorConfig,
+ ActorSystem actorSystem,
+ ToIntFunction partitioner,
+ List observers) {
+ this.type = type;
+ this.actor = new HierarchicalActor<>(type, highLevelActorConfig, actorSystem, this::handle, this::sideline, partitioner, observers);
+ actorSystem.register(actor);
+ }
+
+ protected abstract boolean handle(final M message, MessageMeta messageMeta);
+
+ protected void sideline(final M message, MessageMeta messageMeta) {
+ log.warn("skipping sideline for actor:{} message:{}", type.name(), message);
+ }
+
+ public final boolean publish(final M message) {
+ return actor.publish(message);
+ }
+
+ public final boolean publish(final HierarchicalRoutingKey routingKey, final M message) {
+ return actor.publish(routingKey, message);
+ }
+
+
+ public final void purge() {
+ actor.purge();
+ }
+
+ public final long size() {
+ return actor.size();
+ }
+
+ public final long inFlight() {
+ return actor.inFlight();
+ }
+
+ public final boolean isEmpty() {
+ return actor.isEmpty();
+ }
+
+ public final boolean isRunning() {
+ return actor.isRunning();
+ }
+
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterHelper.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterHelper.java
new file mode 100644
index 0000000..174db86
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalRouterHelper.java
@@ -0,0 +1,68 @@
+package io.appform.memq.hierarchical;
+
+import io.appform.memq.HighLevelActorConfig;
+import io.appform.memq.hierarchical.tree.key.RoutingKey;
+import lombok.val;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HierarchicalRouterHelper {
+
+ private static final String EXECUTORS = "executors";
+ private static final BiFunction, String, String> beautifierFunction = (stream, delimiter) -> stream
+ .filter(e -> !StringUtils.isEmpty(e))
+ .collect(Collectors.joining(delimiter));
+
+
+ public final Function actorConfigToSubActorConfigFunc =
+ actorConfig -> HierarchicalSubActorConfig.builder()
+ .partitions(actorConfig.getPartitions())
+ .maxSizePerPartition(actorConfig.getMaxSizePerPartition())
+ .maxConcurrencyPerPartition(actorConfig.getMaxConcurrencyPerPartition())
+ .retryConfig(actorConfig.getRetryConfig())
+ .exceptionHandlerConfig(actorConfig.getExceptionHandlerConfig())
+ .build();
+
+
+ public > HighLevelActorConfig hierarchicalActorConfig(final MessageType messageType,
+ final RoutingKey routingKeyData,
+ final HierarchicalSubActorConfig subActorConfig,
+ final HierarchicalActorConfig mainActorConfig) {
+ val useParentConfigInWorker = mainActorConfig.isUseParentConfigInWorker();
+ return HighLevelActorConfig.builder()
+ // Custom fields
+ .executorName(executorName(mainActorConfig.getExecutorName(), messageType, routingKeyData))
+
+ // Copy from parent if useParentConfigInWorker is set
+ .partitions(useParentConfigInWorker ? mainActorConfig.getPartitions() : subActorConfig.getPartitions())
+ .maxSizePerPartition(useParentConfigInWorker ? mainActorConfig.getMaxSizePerPartition() : subActorConfig.getMaxSizePerPartition())
+ .maxConcurrencyPerPartition(useParentConfigInWorker ? mainActorConfig.getMaxConcurrencyPerPartition() : subActorConfig.getMaxConcurrencyPerPartition())
+ .retryConfig(useParentConfigInWorker ? mainActorConfig.getRetryConfig() : subActorConfig.getRetryConfig())
+ .exceptionHandlerConfig(useParentConfigInWorker ? mainActorConfig.getExceptionHandlerConfig() : subActorConfig.getExceptionHandlerConfig())
+
+ // Direct from Parent
+ .metricDisabled(mainActorConfig.isMetricDisabled())
+ .build();
+ }
+
+ private > String executorName(final String parentExecutorName,
+ final MessageType messageType,
+ final RoutingKey routingKeyData) {
+ val routingKey = routingKeyData.getRoutingKey();
+
+ if (!StringUtils.isEmpty(parentExecutorName)) {
+ // For backward compatibility
+ if (routingKey.isEmpty()) {
+ return parentExecutorName;
+ }
+
+ return beautifierFunction.apply(Stream.of(parentExecutorName, String.join(".", routingKey)), ".");
+ }
+
+ return beautifierFunction.apply(Stream.of(EXECUTORS, String.join(".", routingKey), messageType.name()), ".");
+ }
+}
\ No newline at end of file
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalSubActorConfig.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalSubActorConfig.java
new file mode 100644
index 0000000..6e6290d
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/HierarchicalSubActorConfig.java
@@ -0,0 +1,52 @@
+package io.appform.memq.hierarchical;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.appform.memq.exceptionhandler.config.DropConfig;
+import io.appform.memq.exceptionhandler.config.ExceptionHandlerConfig;
+import io.appform.memq.retry.config.NoRetryConfig;
+import io.appform.memq.retry.config.RetryConfig;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+import javax.validation.Valid;
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@EqualsAndHashCode
+@ToString
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+public class HierarchicalSubActorConfig {
+
+ @Min(1)
+ @Max(100)
+ @Builder.Default
+ private int partitions = 1;
+
+ @Min(1)
+ @Builder.Default
+ private long maxSizePerPartition = Long.MAX_VALUE;
+
+ @Min(1)
+ @Builder.Default
+ private int maxConcurrencyPerPartition = Integer.MAX_VALUE;
+
+ @Valid
+ @NotNull
+ @Builder.Default
+ private RetryConfig retryConfig = new NoRetryConfig();
+
+ @Valid
+ @NotNull
+ @Builder.Default
+ private ExceptionHandlerConfig exceptionHandlerConfig = new DropConfig();
+
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java
new file mode 100644
index 0000000..6d3f240
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/IHierarchicalActor.java
@@ -0,0 +1,10 @@
+package io.appform.memq.hierarchical;
+
+import io.appform.memq.actor.IActor;
+import io.appform.memq.actor.Message;
+import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey;
+
+public interface IHierarchicalActor extends IActor {
+
+ boolean publish(final HierarchicalRoutingKey routingKey, final M message);
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreSupplierTree.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreSupplierTree.java
new file mode 100644
index 0000000..0298ac9
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreSupplierTree.java
@@ -0,0 +1,63 @@
+package io.appform.memq.hierarchical.tree;
+
+import com.google.common.collect.Lists;
+import io.appform.memq.hierarchical.tree.key.RoutingKey;
+import lombok.val;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+@SuppressWarnings("java:S119")
+public class HierarchicalDataStoreSupplierTree extends HierarchicalDataStoreTree {
+
+ private static final Function, RoutingKey> routingKeyGenerator = (list) -> RoutingKey.builder()
+ .list(list)
+ .build();
+
+ public HierarchicalDataStoreSupplierTree(final NODE_KEY_TYPE key,
+ final HierarchicalTreeConfig treeConfig,
+ final Function rootNodeConverterSupplier,
+ final TriConsumerSupplier supplier) {
+ super(supplier.get(
+ routingKeyGenerator.apply(List.of()),
+ key,
+ rootNodeConverterSupplier.apply(treeConfig.getDefaultData())
+ ));
+ buildTree(key, treeConfig.getChildrenData(), supplier);
+ }
+
+ private void buildTree(final NODE_KEY_TYPE key,
+ final HierarchicalDataStoreTreeNode childrenList,
+ final TriConsumerSupplier supplier) {
+ val tokenList = Lists.newArrayList();
+ buildTreeHelper(key, childrenList, tokenList, supplier);
+ }
+
+ private void buildTreeHelper(final NODE_KEY_TYPE key,
+ final HierarchicalDataStoreTreeNode rootChildrenData,
+ final List tokenList,
+ final TriConsumerSupplier supplier) {
+ val childrenList = rootChildrenData.getChildren();
+ if (childrenList.isEmpty()) {
+ add(routingKeyGenerator.apply(tokenList), key, null);
+ return;
+ }
+
+ for (String childrenToken : childrenList.keySet()) {
+ val currentChildrenData = childrenList.get(childrenToken);
+
+ tokenList.add(childrenToken);
+
+ val routingKey = routingKeyGenerator.apply(tokenList.stream().map(String::valueOf).toList());
+ val currentChildrenDefaultData = Objects.nonNull(currentChildrenData.getNodeData()) ?
+ currentChildrenData.getNodeData() : rootChildrenData.getNodeData();
+
+ add(routingKey, key, supplier.get(routingKey, key, currentChildrenDefaultData));
+ buildTreeHelper(key, currentChildrenData, tokenList, supplier);
+
+ tokenList.remove(childrenToken);
+ }
+ }
+
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTree.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTree.java
new file mode 100644
index 0000000..beaaeb4
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTree.java
@@ -0,0 +1,65 @@
+package io.appform.memq.hierarchical.tree;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import com.google.common.collect.Maps;
+import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import lombok.val;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+@Slf4j
+@ToString
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+@SuppressWarnings("java:S119")
+public class HierarchicalDataStoreTree {
+
+ private final NODE_TYPE defaultData;
+ @JsonUnwrapped
+ private final Map> rootNodes = Maps.newConcurrentMap();
+
+ public HierarchicalDataStoreTree() {
+ this.defaultData = null;
+ }
+
+ public HierarchicalDataStoreTree(NODE_TYPE defaultData) {
+ this.defaultData = defaultData;
+ }
+
+ public void add(final HierarchicalRoutingKey routingKey, final NODE_KEY_TYPE key, final NODE_TYPE data) {
+ rootNodes.computeIfAbsent(key, t -> new HierarchicalDataStoreTreeNode<>(0, String.valueOf(key), defaultData));
+ if (Objects.isNull(data)) {
+ return;
+ }
+ rootNodes.get(key)
+ .add(routingKey, data);
+ }
+
+ public void traverse(final Consumer consumer) {
+ rootNodes.forEach((NODEKEYTYPE, vHierarchicalStoreNode) -> {
+ if (vHierarchicalStoreNode != null) {
+ vHierarchicalStoreNode.traverse(consumer);
+ }
+ });
+ }
+
+ public NODE_TYPE get(final NODE_KEY_TYPE key, final HierarchicalRoutingKey routingKey) {
+ if (!rootNodes.containsKey(key)) {
+ log.warn("Key {} not found in {} keys {}. Using default {}", key, rootNodes.keySet(), defaultData);
+ return defaultData;
+ }
+
+ val routingKeyToken = routingKey.getRoutingKey();
+ if (routingKeyToken== null || routingKeyToken.isEmpty()) {
+ log.warn("keys are empty {}. Using default {}", key, rootNodes.keySet(), defaultData);
+ return defaultData;
+ }
+
+ return rootNodes.get(key)
+ .find(routingKey);
+ }
+}
\ No newline at end of file
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTreeNode.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTreeNode.java
new file mode 100644
index 0000000..5d2c6bb
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalDataStoreTreeNode.java
@@ -0,0 +1,107 @@
+package io.appform.memq.hierarchical.tree;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.google.common.collect.Maps;
+import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey;
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+@Slf4j
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+@Data
+public class HierarchicalDataStoreTreeNode {
+
+ @JsonIgnore
+ private final int depth;
+ @JsonIgnore
+ private final K token;
+
+ private V nodeData;
+ private Map> children = Maps.newConcurrentMap();
+
+
+ public HierarchicalDataStoreTreeNode() {
+ this.depth = 0;
+ this.token = null;
+ this.nodeData = null;
+ }
+
+ public HierarchicalDataStoreTreeNode(K token) {
+ this.depth = 0;
+ this.token = token;
+ this.nodeData = null;
+ }
+
+
+ @Builder
+ public HierarchicalDataStoreTreeNode(final int depth, final K token, final V nodeData) {
+ this.depth = depth;
+ this.token = token;
+ this.nodeData = nodeData;
+ }
+
+ void traverse(final Consumer consumer) {
+ if (nodeData != null) {
+ consumer.accept(nodeData);
+ }
+ children.forEach((k, kvHierarchicalStoreNode) -> {
+ if (kvHierarchicalStoreNode != null) {
+ kvHierarchicalStoreNode.traverse(consumer);
+ }
+ });
+ }
+
+ void addChild(final List tokens, final V defaultData) {
+ final K key = tokens.get(depth);
+
+ log.debug("depth: {} name: {} key: {} tokens: {} defaultData: {}", depth, token, key, tokens, defaultData);
+
+ if (tokens.size() > depth + 1) {
+ children.computeIfAbsent(key, k -> new HierarchicalDataStoreTreeNode<>(depth + 1, tokens.get(depth), null));
+ children.get(key).addChild(tokens, defaultData);
+ } else {
+ if (!children.containsKey(key)) {
+ children.put(key, new HierarchicalDataStoreTreeNode(depth + 1, tokens.get(depth), defaultData));
+ } else {
+ if (children.get(key)
+ .getNodeData() == null) {
+ children.get(key)
+ .setNodeData(defaultData);
+ } else {
+ log.error("Request to overwrite at {} existing defaultData: {} new defaultData {}", tokens, children.get(key)
+ .getNodeData(), defaultData);
+ }
+ }
+ }
+ }
+
+ V findNode(final List tokens) {
+ if (tokens.size() == depth) {
+ return nodeData;
+ }
+
+ if (!children.containsKey(tokens.get(depth))) {
+ return nodeData;
+ }
+
+ V load = children.get(tokens.get(depth))
+ .findNode(tokens);
+ return load == null
+ ? nodeData
+ : load;
+ }
+
+ public void add(final HierarchicalRoutingKey routingKey, final V payload) {
+ addChild(routingKey.getRoutingKey(), payload);
+ }
+
+ public V find(final HierarchicalRoutingKey routingKey) {
+ return findNode(routingKey.getRoutingKey());
+ }
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalTreeConfig.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalTreeConfig.java
new file mode 100644
index 0000000..7519d61
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/HierarchicalTreeConfig.java
@@ -0,0 +1,18 @@
+package io.appform.memq.hierarchical.tree;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@SuppressWarnings("java:S119")
+public class HierarchicalTreeConfig {
+ private ROOT_TYPE defaultData;
+ @JsonUnwrapped
+ private HierarchicalDataStoreTreeNode childrenData;
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java
new file mode 100644
index 0000000..87ee0d0
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/TriConsumerSupplier.java
@@ -0,0 +1,6 @@
+package io.appform.memq.hierarchical.tree;
+
+@FunctionalInterface
+public interface TriConsumerSupplier {
+ S get(R routingKey, K key, V value);
+}
\ No newline at end of file
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/HierarchicalRoutingKey.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/HierarchicalRoutingKey.java
new file mode 100644
index 0000000..ee6248d
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/HierarchicalRoutingKey.java
@@ -0,0 +1,7 @@
+package io.appform.memq.hierarchical.tree.key;
+
+import java.util.List;
+
+public interface HierarchicalRoutingKey {
+ List getRoutingKey();
+}
diff --git a/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/RoutingKey.java b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/RoutingKey.java
new file mode 100644
index 0000000..221bd08
--- /dev/null
+++ b/memq-actor/src/main/java/io/appform/memq/hierarchical/tree/key/RoutingKey.java
@@ -0,0 +1,20 @@
+package io.appform.memq.hierarchical.tree.key;
+
+
+import lombok.Builder;
+
+import java.util.List;
+
+public class RoutingKey implements HierarchicalRoutingKey {
+ private final List list;
+
+ @Builder
+ public RoutingKey(final List list) {
+ this.list = list;
+ }
+
+ @Override
+ public List getRoutingKey() {
+ return list;
+ }
+}
\ No newline at end of file
diff --git a/memq-actor/src/main/java/io/appform/memq/observer/ActorObserver.java b/memq-actor/src/main/java/io/appform/memq/observer/ActorObserver.java
index 9744e63..b63fd50 100644
--- a/memq-actor/src/main/java/io/appform/memq/observer/ActorObserver.java
+++ b/memq-actor/src/main/java/io/appform/memq/observer/ActorObserver.java
@@ -1,6 +1,6 @@
package io.appform.memq.observer;
-import io.appform.memq.actor.Actor;
+import io.appform.memq.actor.IActor;
import io.appform.memq.actor.Message;
import lombok.Getter;
@@ -15,7 +15,7 @@ protected ActorObserver(ActorObserver next) {
this.next = next;
}
- public abstract void initialize(Actor extends Message> actor);
+ public abstract void initialize(IActor extends Message> actor);
public abstract boolean execute(
final ActorObserverContext extends Message> context,
diff --git a/memq-actor/src/main/java/io/appform/memq/observer/TerminalActorObserver.java b/memq-actor/src/main/java/io/appform/memq/observer/TerminalActorObserver.java
index e991a53..6be0b6f 100644
--- a/memq-actor/src/main/java/io/appform/memq/observer/TerminalActorObserver.java
+++ b/memq-actor/src/main/java/io/appform/memq/observer/TerminalActorObserver.java
@@ -1,7 +1,7 @@
package io.appform.memq.observer;
-import io.appform.memq.actor.Actor;
+import io.appform.memq.actor.IActor;
import io.appform.memq.actor.Message;
import java.util.function.BooleanSupplier;
@@ -13,7 +13,7 @@ public TerminalActorObserver() {
}
@Override
- public void initialize(Actor extends Message> actor) {
+ public void initialize(IActor extends Message> actor) {
}
@Override
diff --git a/memq-actor/src/main/java/io/appform/memq/stats/ActorMetricObserver.java b/memq-actor/src/main/java/io/appform/memq/stats/ActorMetricObserver.java
index 21ad7fd..3b8c498 100644
--- a/memq-actor/src/main/java/io/appform/memq/stats/ActorMetricObserver.java
+++ b/memq-actor/src/main/java/io/appform/memq/stats/ActorMetricObserver.java
@@ -2,7 +2,7 @@
import com.codahale.metrics.*;
-import io.appform.memq.actor.Actor;
+import io.appform.memq.actor.IActor;
import io.appform.memq.actor.Message;
import io.appform.memq.observer.ActorObserver;
import io.appform.memq.observer.ActorObserverContext;
@@ -40,7 +40,7 @@ private static String normalizeString(final String name) {
}
@Override
- public void initialize(Actor extends Message> actor) {
+ public void initialize(IActor extends Message> actor) {
this.metricRegistry.gauge(MetricRegistry.name(getMetricPrefix(actorName), "size"),
(MetricRegistry.MetricSupplier>) () ->
new CachedGauge<>(5, TimeUnit.SECONDS) {
diff --git a/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java b/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java
index 29ab953..13709e3 100644
--- a/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java
+++ b/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java
@@ -4,12 +4,13 @@
import com.google.common.collect.Lists;
import io.appform.memq.ActorSystem;
import io.appform.memq.HighLevelActor;
-import io.appform.memq.actor.Actor;
+import io.appform.memq.actor.IActor;
import io.appform.memq.HighLevelActorConfig;
import io.appform.memq.exceptionhandler.config.ExceptionHandlerConfig;
import io.appform.memq.exceptionhandler.config.SidelineConfig;
import io.appform.memq.helper.message.TestIntMessage;
import io.appform.memq.actor.MessageMeta;
+import io.appform.memq.hierarchical.IHierarchicalActor;
import io.appform.memq.observer.ActorObserver;
import io.appform.memq.retry.RetryStrategy;
import io.appform.memq.retry.RetryStrategyFactory;
@@ -38,10 +39,16 @@ public static ActorSystem actorSystem(ExecutorService tp) {
val metricRegistry = new MetricRegistry();
return new ActorSystem() {
private final RetryStrategyFactory retryStrategyFactory = new RetryStrategyFactory();
- private final List> registeredActors = Lists.newArrayList();
+ private final List> registeredActors = Lists.newArrayList();
@Override
- public void register(Actor> actor) {
+ public void register(IActor> actor) {
+ registeredActors.add(actor);
+ actor.start();
+ }
+
+ @Override
+ public void register(IHierarchicalActor> actor) {
registeredActors.add(actor);
actor.start();
}
@@ -68,12 +75,12 @@ public List registeredObservers() {
@Override
public boolean isRunning() {
- return !registeredActors.isEmpty() && registeredActors.stream().allMatch(Actor::isRunning);
+ return !registeredActors.isEmpty() && registeredActors.stream().allMatch(IActor::isRunning);
}
@Override
public void close() {
- registeredActors.forEach(Actor::close);
+ registeredActors.forEach(IActor::close);
}
};
}
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java
new file mode 100644
index 0000000..3d9933f
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/FlowHierarchicalMemqActorConfig.java
@@ -0,0 +1,16 @@
+package io.appform.memq.hierarchical;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class FlowHierarchicalMemqActorConfig> {
+ private Map workers;
+}
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java
new file mode 100644
index 0000000..5c53390
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/HierarchicalHighLevelActorTest.java
@@ -0,0 +1,99 @@
+package io.appform.memq.hierarchical;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.appform.memq.ActorSystem;
+import io.appform.memq.MemQTestExtension;
+import io.appform.memq.hierarchical.actor.FlowTypeHierarchicalActorBuilder;
+import io.appform.memq.hierarchical.data.ActionMessage;
+import io.appform.memq.hierarchical.data.FlowType;
+import io.appform.memq.hierarchical.data.OneDataActionMessage;
+import io.appform.memq.hierarchical.data.TwoDataActionMessage;
+import io.appform.memq.hierarchical.tree.key.RoutingKey;
+import io.appform.memq.util.YamlReader;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import lombok.val;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+@ExtendWith(MemQTestExtension.class)
+public class HierarchicalHighLevelActorTest {
+
+ private final static FlowHierarchicalMemqActorConfig RMQ_CONFIG = YamlReader.loadConfig("rmqHierarchicalMemq.yaml", new TypeReference<>() {
+ });
+ private Map> actorActors;
+
+ @SneakyThrows
+ public void createActors(ActorSystem actorSystem) {
+ actorActors = RMQ_CONFIG.getWorkers()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getKey().accept(new FlowTypeHierarchicalActorBuilder(e.getValue(), actorSystem))));
+ }
+
+ @Test
+ @SneakyThrows
+ void testSuccessSinglePartition(ActorSystem actorSystem) {
+ createActors(actorSystem);
+ val messages = Map.of(
+ RoutingKey.builder().list(List.of("")).build(),
+ OneDataActionMessage.builder()
+ .data("FLOW_ONE")
+ .build(),
+
+ RoutingKey.builder().list(List.of("L1", "L2")).build(),
+ OneDataActionMessage.builder()
+ .data("FLOW_ONE-L1-L2-SOME")
+ .build(),
+
+ RoutingKey.builder().list(List.of("L1")).build(),
+ TwoDataActionMessage.builder()
+ .data("FLOW_TWO-L1")
+ .build(),
+
+ RoutingKey.builder().list(List.of("")).build(),
+ TwoDataActionMessage.builder()
+ .data("FLOW_TWO")
+ .build()
+
+// RoutingKey.builder().list(List.of("L2", "L1")).build(),
+// OneDataActionMessage.builder()
+// .data("FLOW_ONE-L2-L1-SOME")
+// .build()
+ );
+
+ messages.forEach((routingKey, message) -> {
+ val flowType = message.getType();
+
+ if (actorActors.containsKey(flowType)) {
+ val router = actorActors.get(flowType);
+ Assertions.assertNotNull(router);
+ val worker = router.getActor().getWorker().get(flowType, routingKey);
+ Assertions.assertNotNull(worker);
+
+ val routingKeyWorker = worker.getName();
+ if(!routingKeyWorker.isEmpty()) {
+ val routingKeyStr = String.join(".", routingKey.getRoutingKey());
+ System.out.println(routingKeyWorker + " " + routingKeyStr);
+ Assertions.assertTrue(routingKeyWorker.contains(routingKeyStr));
+ }
+ message.setExecutorName(String.join("-", routingKeyWorker));
+ try {
+ router.publish(routingKey, message);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ }
+
+
+}
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java
new file mode 100644
index 0000000..a518285
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/FlowTypeHierarchicalActorBuilder.java
@@ -0,0 +1,30 @@
+package io.appform.memq.hierarchical.actor;
+
+
+import io.appform.memq.ActorSystem;
+import io.appform.memq.hierarchical.HierarchicalActorConfig;
+import io.appform.memq.hierarchical.HierarchicalHighLevelActor;
+import io.appform.memq.hierarchical.data.ActionMessage;
+import io.appform.memq.hierarchical.data.FlowType;
+
+public class FlowTypeHierarchicalActorBuilder implements FlowType.FlowTypeVisitor> {
+
+ private final HierarchicalActorConfig hierarchicalTreeConfig;
+ private final ActorSystem actorSystem;
+
+ public FlowTypeHierarchicalActorBuilder(final HierarchicalActorConfig hierarchicalTreeConfig,
+ final ActorSystem actorSystem) {
+ this.hierarchicalTreeConfig = hierarchicalTreeConfig;
+ this.actorSystem = actorSystem;
+ }
+
+ @Override
+ public HierarchicalHighLevelActor visitOne() {
+ return new OneDataActionMessageHierarchicalActor(hierarchicalTreeConfig, actorSystem);
+ }
+
+ @Override
+ public HierarchicalHighLevelActor visitTwo() {
+ return new TwoDataActionMessageHierarchicalActor(hierarchicalTreeConfig, actorSystem);
+ }
+}
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/OneDataActionMessageHierarchicalActor.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/OneDataActionMessageHierarchicalActor.java
new file mode 100644
index 0000000..6a13bed
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/OneDataActionMessageHierarchicalActor.java
@@ -0,0 +1,25 @@
+package io.appform.memq.hierarchical.actor;
+
+
+import io.appform.memq.ActorSystem;
+import io.appform.memq.actor.MessageMeta;
+import io.appform.memq.hierarchical.HierarchicalActorConfig;
+import io.appform.memq.hierarchical.HierarchicalHighLevelActor;
+import io.appform.memq.hierarchical.data.ActionMessage;
+import io.appform.memq.hierarchical.data.FlowType;
+
+public class OneDataActionMessageHierarchicalActor extends HierarchicalHighLevelActor {
+
+
+ public OneDataActionMessageHierarchicalActor(final HierarchicalActorConfig hierarchicalTreeConfig,
+ final ActorSystem actorSystem) {
+ super(FlowType.FLOW_ONE, hierarchicalTreeConfig, actorSystem);
+ }
+
+ @Override
+ protected boolean handle(ActionMessage actionMessage, MessageMeta messageMeta) {
+ System.out.println("FLOW_ONE : " + actionMessage);
+ return true;
+ }
+
+}
\ No newline at end of file
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/TwoDataActionMessageHierarchicalActor.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/TwoDataActionMessageHierarchicalActor.java
new file mode 100644
index 0000000..e62c11a
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/actor/TwoDataActionMessageHierarchicalActor.java
@@ -0,0 +1,24 @@
+package io.appform.memq.hierarchical.actor;
+
+import io.appform.memq.ActorSystem;
+import io.appform.memq.actor.MessageMeta;
+import io.appform.memq.hierarchical.HierarchicalActorConfig;
+import io.appform.memq.hierarchical.HierarchicalHighLevelActor;
+import io.appform.memq.hierarchical.data.ActionMessage;
+import io.appform.memq.hierarchical.data.FlowType;
+
+
+public class TwoDataActionMessageHierarchicalActor extends HierarchicalHighLevelActor {
+
+
+ public TwoDataActionMessageHierarchicalActor(final HierarchicalActorConfig hierarchicalTreeConfig,
+ final ActorSystem actorSystem) {
+ super(FlowType.FLOW_TWO, hierarchicalTreeConfig, actorSystem);
+ }
+
+ @Override
+ protected boolean handle(ActionMessage actionMessage, MessageMeta messageMetadata) {
+ System.out.println("FLOW_TWO : " + actionMessage);
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java
new file mode 100644
index 0000000..68ff5c3
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/ActionMessage.java
@@ -0,0 +1,33 @@
+package io.appform.memq.hierarchical.data;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.appform.memq.actor.Message;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Setter;
+import lombok.ToString;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@EqualsAndHashCode
+@ToString
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(name = FlowType.FLOW_ONE_TEXT, value = OneDataActionMessage.class),
+ @JsonSubTypes.Type(name = FlowType.FLOW_TWO_TEXT, value = TwoDataActionMessage.class)
+})
+public abstract class ActionMessage implements Message {
+
+ @NotNull
+ private final FlowType type;
+
+ @Setter
+ private String executorName;
+
+ protected ActionMessage(FlowType type) {
+ this.type = type;
+ }
+
+}
\ No newline at end of file
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java
new file mode 100644
index 0000000..f1a0cba
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/FlowType.java
@@ -0,0 +1,25 @@
+package io.appform.memq.hierarchical.data;
+
+public enum FlowType {
+ FLOW_ONE {
+ @Override
+ public T accept(FlowTypeVisitor visitor) {
+ return visitor.visitOne();
+ }
+ },
+ FLOW_TWO {
+ @Override
+ public T accept(FlowTypeVisitor visitor) {
+ return visitor.visitTwo();
+ }
+ };
+
+ public static final String FLOW_ONE_TEXT = "FLOW_ONE";
+ public static final String FLOW_TWO_TEXT = "FLOW_TWO";
+
+ public abstract T accept(FlowTypeVisitor visitor);
+ public interface FlowTypeVisitor {
+ T visitOne();
+ T visitTwo();
+ }
+}
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/OneDataActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/OneDataActionMessage.java
new file mode 100644
index 0000000..736fd1e
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/OneDataActionMessage.java
@@ -0,0 +1,28 @@
+package io.appform.memq.hierarchical.data;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class OneDataActionMessage extends ActionMessage {
+ private String data;
+
+ public OneDataActionMessage() {
+ super(FlowType.FLOW_ONE);
+ }
+
+ @Builder
+ public OneDataActionMessage(String data) {
+ this();
+ this.data = data;
+ }
+
+ @Override
+ public String id() {
+ return data;
+ }
+}
\ No newline at end of file
diff --git a/memq-actor/src/test/java/io/appform/memq/hierarchical/data/TwoDataActionMessage.java b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/TwoDataActionMessage.java
new file mode 100644
index 0000000..dcb8f3c
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/hierarchical/data/TwoDataActionMessage.java
@@ -0,0 +1,29 @@
+package io.appform.memq.hierarchical.data;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class TwoDataActionMessage extends ActionMessage {
+ private String data;
+
+ public TwoDataActionMessage() {
+ super(FlowType.FLOW_TWO);
+ }
+
+ @Builder
+ public TwoDataActionMessage(String data) {
+ this();
+ this.data = data;
+ }
+
+ @Override
+ public String id() {
+ return data;
+ }
+
+}
\ No newline at end of file
diff --git a/memq-actor/src/test/java/io/appform/memq/util/YamlReader.java b/memq-actor/src/test/java/io/appform/memq/util/YamlReader.java
new file mode 100644
index 0000000..1885c63
--- /dev/null
+++ b/memq-actor/src/test/java/io/appform/memq/util/YamlReader.java
@@ -0,0 +1,48 @@
+package io.appform.memq.util;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.io.Resources;
+import lombok.SneakyThrows;
+import lombok.experimental.UtilityClass;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+@UtilityClass
+public class YamlReader {
+
+ // Config reader
+ private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
+
+ @SneakyThrows
+ public T readYAML(final String data, final TypeReference typeReference) {
+ return yamlMapper.readValue(data, typeReference);
+ }
+
+ @SneakyThrows
+ public T loadConfig(final String filePath, final TypeReference typeReference) {
+ String data = fixture(filePath);
+ return readYAML(data, typeReference);
+ }
+
+ public static String fixture(String filename) {
+ return fixture(filename, StandardCharsets.UTF_8);
+ }
+
+ private static String fixture(String filename, Charset charset) {
+ URL resource = Resources.getResource(filename);
+
+ try {
+ return Resources.toString(resource, charset).trim();
+ } catch (IOException var4) {
+ IOException e = var4;
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+
+}
diff --git a/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml b/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml
new file mode 100644
index 0000000..a7da3c0
--- /dev/null
+++ b/memq-actor/src/test/resources/rmqHierarchicalMemq.yaml
@@ -0,0 +1,23 @@
+workers:
+ FLOW_ONE:
+ executorName: sample.flow.actor.one
+ partitions: 1
+ children:
+ L1:
+ nodeData:
+ partitions: 1
+ children:
+ L1:
+ nodeData:
+ partitions: 2
+ L2:
+ nodeData:
+ partitions: 1
+ FLOW_TWO:
+ executorName: sample.flow.actor.two
+ partitions: 1
+ children:
+ L1:
+ nodeData:
+ partitions: 1
+
diff --git a/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java b/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java
index 8a752ac..caea4ed 100644
--- a/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java
+++ b/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java
@@ -4,8 +4,9 @@
import io.appform.config.ExecutorConfig;
import io.appform.config.MemqConfig;
import io.appform.memq.ActorSystem;
-import io.appform.memq.actor.Actor;
+import io.appform.memq.actor.IActor;
import io.appform.memq.HighLevelActorConfig;
+import io.appform.memq.hierarchical.IHierarchicalActor;
import io.appform.memq.observer.ActorObserver;
import io.appform.memq.retry.RetryStrategy;
import io.appform.memq.retry.RetryStrategyFactory;
@@ -28,7 +29,7 @@ public class MemqActorSystem implements ActorSystem, Managed {
private final ConcurrentHashMap executors;
private final ExecutorServiceProvider executorServiceProvider;
private final Map executorConfigMap;
- private final List> registeredActors;
+ private final List> registeredActors;
private final RetryStrategyFactory retryStrategyFactory;
private final MetricRegistry metricRegistry;
private final List actorObservers;
@@ -55,12 +56,18 @@ public MemqActorSystem(
//System shutdown
@Override
public void close() {
- registeredActors.forEach(Actor::close);
+ registeredActors.forEach(IActor::close);
executors.values().forEach(ExecutorService::shutdown);
}
@Override
- public final void register(Actor> actor) {
+ public final void register(IActor> actor) {
+ registeredActors.add(actor);
+ actor.start(); //Starting actor during registration
+ }
+
+ @Override
+ public final void register(IHierarchicalActor> actor) {
registeredActors.add(actor);
actor.start(); //Starting actor during registration
}
@@ -89,7 +96,7 @@ public List registeredObservers() {
@Override
public boolean isRunning() {
- return !registeredActors.isEmpty() && registeredActors.stream().allMatch(Actor::isRunning);
+ return !registeredActors.isEmpty() && registeredActors.stream().allMatch(IActor::isRunning);
}
@Override