diff --git a/lib/horde/cluster_transport.ex b/lib/horde/cluster_transport.ex new file mode 100644 index 0000000..3caa284 --- /dev/null +++ b/lib/horde/cluster_transport.ex @@ -0,0 +1,22 @@ +defmodule Horde.ClusterTransport do + @moduledoc """ + Behaviour for cluster communication primitives. + + Allows Horde to work with both standard Erlang distribution and + alternative transports like Partisan. + + Configure per registry/supervisor via the `:transport` option: + + {Horde.Registry, name: MyRegistry, keys: :unique, + transport: Horde.ClusterTransport.Partisan} + """ + + @doc "Returns all connected peer nodes." + @callback members() :: [node()] + + @doc "Returns true if the given pid is alive on its node." + @callback process_alive?(pid()) :: boolean() + + @doc "Call a function on a remote node." + @callback call(node(), module(), atom(), [term()], timeout()) :: term() +end diff --git a/lib/horde/cluster_transport/erlang.ex b/lib/horde/cluster_transport/erlang.ex new file mode 100644 index 0000000..a94fa97 --- /dev/null +++ b/lib/horde/cluster_transport/erlang.ex @@ -0,0 +1,26 @@ +defmodule Horde.ClusterTransport.Erlang do + @moduledoc "Cluster transport using standard Erlang distribution." + + @behaviour Horde.ClusterTransport + + @impl true + def members(), do: Node.list() + + @impl true + def process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid) + + def process_alive?(pid) do + n = node(pid) + + Node.list() |> Enum.member?(n) && + :erpc.call(n, Process, :alive?, [pid]) + catch + :error, {:erpc, :noconnection} -> false + type, reason -> :erlang.raise(type, reason, __STACKTRACE__) + end + + @impl true + def call(node, mod, fun, args, timeout) do + :erpc.call(node, mod, fun, args, timeout) + end +end diff --git a/lib/horde/cluster_transport/partisan.ex b/lib/horde/cluster_transport/partisan.ex new file mode 100644 index 0000000..be30cd8 --- /dev/null +++ b/lib/horde/cluster_transport/partisan.ex @@ -0,0 +1,53 @@ +if Code.ensure_loaded?(:partisan_peer_service) do + defmodule Horde.ClusterTransport.Partisan do + @moduledoc "Cluster transport using Partisan for peer communication." + + @behaviour Horde.ClusterTransport + + @impl true + def members() do + case :partisan_peer_service.members() do + members when is_list(members) -> + members + |> Enum.map(& &1.name) + |> Enum.reject(&(&1 == node())) + + _ -> + [] + end + rescue + _ -> [] + end + + @impl true + def process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid) + + def process_alive?(pid) do + n = node(pid) + + if peer?(n) do + try do + :partisan_rpc.call(n, Process, :alive?, [pid], 5_000) + catch + _, _ -> false + end + else + false + end + end + + @impl true + def call(node, mod, fun, args, timeout) do + :partisan_rpc.call(node, mod, fun, args, timeout) + end + + defp peer?(node) do + case :partisan_peer_service.members() do + members when is_list(members) -> Enum.any?(members, &(&1.name == node)) + _ -> false + end + rescue + _ -> false + end + end +end diff --git a/lib/horde/dynamic_supervisor.ex b/lib/horde/dynamic_supervisor.ex index ac1d628..ee71d8a 100644 --- a/lib/horde/dynamic_supervisor.ex +++ b/lib/horde/dynamic_supervisor.ex @@ -288,6 +288,9 @@ defmodule Horde.DynamicSupervisor do defp maybe_add_node_manager(children, :auto, name), do: children ++ [{Horde.NodeListener, name}] + defp maybe_add_node_manager(children, {:auto, listener}, name), + do: children ++ [{listener, name}] + defp maybe_add_node_manager(children, _, _), do: children defp delta_crdt_options(options) do @@ -299,6 +302,7 @@ defmodule Horde.DynamicSupervisor do end defp members(:auto, _name), do: :auto + defp members({:auto, _listener}, _name), do: :auto defp members(options, name) do if name in options do diff --git a/lib/horde/node_listener.ex b/lib/horde/node_listener.ex index 6c6063f..aa4a5ba 100644 --- a/lib/horde/node_listener.ex +++ b/lib/horde/node_listener.ex @@ -19,7 +19,6 @@ defmodule Horde.NodeListener do do: Enum.map(nodes(), fn node -> {cluster, node} end) # GenServer callbacks - def init(cluster) do :net_kernel.monitor_nodes(true, node_type: :visible) {:ok, cluster} @@ -30,16 +29,6 @@ defmodule Horde.NodeListener do {:noreply, cluster} end - def handle_info({:nodeup, _node, _node_type}, cluster) do - set_members(cluster) - {:noreply, cluster} - end - - def handle_info({:nodedown, _node, _node_type}, cluster) do - set_members(cluster) - {:noreply, cluster} - end - def handle_info(_, cluster), do: {:noreply, cluster} # Helper functions diff --git a/lib/horde/node_listener/partisan.ex b/lib/horde/node_listener/partisan.ex new file mode 100644 index 0000000..5f29439 --- /dev/null +++ b/lib/horde/node_listener/partisan.ex @@ -0,0 +1,54 @@ +defmodule Horde.NodeListener.Partisan do + @moduledoc """ + A Horde node listener for Partisan-based clusters. + + Uses `:partisan.monitor_nodes/1` for node monitoring and + `:partisan_peer_service` for membership, replacing the default + Erlang distribution assumptions in `Horde.NodeListenerBehaviour`. + + ## Usage + + {Horde.Registry, + name: MyRegistry, + keys: :unique, + members: {:auto, Horde.NodeListener.Partisan}} + """ + + use Horde.NodeListenerBehaviour + + @impl GenServer + def init(cluster) do + :partisan.monitor_nodes(true) + {:ok, cluster} + end + + @impl Horde.NodeListenerBehaviour + def make_members(cluster) do + case :partisan_peer_service.members() do + members when is_list(members) -> + Enum.map(members, fn peer -> {cluster, peer.name} end) + + _ -> + [{cluster, :partisan_config.get(:name)}] + end + end + + @impl Horde.NodeListenerBehaviour + def handle_nodeup(_node, cluster), do: set_members(cluster) + + @impl Horde.NodeListenerBehaviour + def handle_nodedown(_node, cluster), do: set_members(cluster) + + # Partisan emits 2-tuple {:nodeup, node} / {:nodedown, node} — no node_type + @impl GenServer + def handle_info({:nodeup, node}, cluster) do + handle_nodeup(node, cluster) + {:noreply, cluster} + end + + @impl GenServer + def handle_info({:nodedown, node}, cluster) do + handle_nodedown(node, cluster) + {:noreply, cluster} + end +end diff --git a/lib/horde/node_listener_behaviour.ex b/lib/horde/node_listener_behaviour.ex new file mode 100644 index 0000000..a9304fc --- /dev/null +++ b/lib/horde/node_listener_behaviour.ex @@ -0,0 +1,98 @@ +defmodule Horde.NodeListenerBehaviour do + @moduledoc """ + A behaviour for cluster membership managers. + + Use this module to build a custom node listener with sensible defaults: + + defmodule MyNodeListener do + use Horde.NodeListenerBehaviour + + @impl Horde.NodeListenerBehaviour + def make_members(cluster), + do: Enum.map(Node.list([:visible, :this]), &{cluster, &1}) + end + + The `use` macro injects default implementations of all callbacks. The only + required callback to override is `make_members/1`. All others are overridable. + """ + + @doc "Returns the member list for the given cluster." + @callback make_members(cluster :: atom()) :: [{atom(), node()}] + + @doc "Called when a node comes up." + @callback handle_nodeup(node :: node(), cluster :: atom()) :: atom() + + @doc "Called when a node goes down." + @callback handle_nodedown(node :: node(), cluster :: atom()) :: atom() + + @optional_callbacks handle_nodeup: 2, handle_nodedown: 2 + + defmacro __using__(_opts) do + quote do + @behaviour Horde.NodeListenerBehaviour + + use GenServer + + # --- API --- + + @spec start_link(atom()) :: GenServer.on_start() + def start_link(cluster), + do: GenServer.start_link(__MODULE__, cluster, name: listener_name(cluster)) + + # --- Behaviour defaults --- + + @impl GenServer + def init(cluster) do + :net_kernel.monitor_nodes(true, node_type: :visible) + {:ok, cluster} + end + + @impl Horde.NodeListenerBehaviour + def handle_nodeup(_node, cluster) do + set_members(cluster) + end + + @impl Horde.NodeListenerBehaviour + def handle_nodedown(_node, cluster) do + set_members(cluster) + end + + # --- GenServer callbacks --- + + @impl GenServer + def handle_cast(:initial_set, cluster) do + set_members(cluster) + {:noreply, cluster} + end + + @impl GenServer + def handle_info({:nodeup, node, _node_type}, cluster) do + handle_nodeup(node, cluster) + {:noreply, cluster} + end + + @impl GenServer + def handle_info({:nodedown, node, _node_type}, cluster) do + handle_nodedown(node, cluster) + {:noreply, cluster} + end + + @impl GenServer + def handle_info(_, cluster), do: {:noreply, cluster} + + # --- Helpers --- + + defp listener_name(cluster), do: Module.concat(cluster, NodeListener) + + defp set_members(cluster), + do: :ok = Horde.Cluster.set_members(cluster, make_members(cluster)) + + defoverridable start_link: 1, + init: 1, + handle_nodeup: 2, + handle_nodedown: 2, + handle_cast: 2, + handle_info: 2 + end + end +end diff --git a/lib/horde/registry.ex b/lib/horde/registry.ex index 5d3e6cf..fbc7d3a 100644 --- a/lib/horde/registry.ex +++ b/lib/horde/registry.ex @@ -102,7 +102,8 @@ defmodule Horde.Registry do :keys, :distribution_strategy, :members, - :delta_crdt_options + :delta_crdt_options, + :transport ] {sup_options, start_options} = Keyword.split(options, keys) @@ -132,6 +133,7 @@ defmodule Horde.Registry do meta = Keyword.get(options, :meta, nil) members = Keyword.get(options, :members, []) delta_crdt_options = Keyword.get(options, :delta_crdt_options, []) + transport = Keyword.get(options, :transport, Horde.ClusterTransport.Erlang) distribution_strategy = Keyword.get( @@ -146,7 +148,8 @@ defmodule Horde.Registry do keys: keys, distribution_strategy: distribution_strategy, members: members, - delta_crdt_options: delta_crdt_options(delta_crdt_options) + delta_crdt_options: delta_crdt_options(delta_crdt_options), + transport: transport } {:ok, flags} @@ -169,7 +172,7 @@ defmodule Horde.Registry do [ name: name, listeners: flags.listeners, - meta: flags.meta, + meta: [{:transport, flags.transport} | (flags.meta || [])], keys: flags.keys, members: members(flags.members, name) ]} @@ -248,9 +251,11 @@ defmodule Horde.Registry do def lookup({:via, _, {registry, name}}), do: lookup(registry, name) def lookup(registry, key) when is_atom(registry) do + transport = transport(registry) + with [{^key, member, {pid, value}}] <- :ets.lookup(keys_ets_table(registry), key), true <- member_in_cluster?(registry, member), - true <- process_alive?(pid) do + true <- transport.process_alive?(pid) do [{pid, value}] else _ -> [] @@ -410,18 +415,16 @@ defmodule Horde.Registry do defp maybe_add_node_manager(children, :auto, name), do: children ++ [{Horde.NodeListener, name}] - defp maybe_add_node_manager(children, _, _), do: children - - defp process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid) + defp maybe_add_node_manager(children, {:auto, listener}, name), + do: children ++ [{listener, name}] - defp process_alive?(pid) do - n = node(pid) + defp maybe_add_node_manager(children, _, _), do: children - Node.list() |> Enum.member?(n) && - :erpc.call(n, Process, :alive?, [pid]) - catch - :error, {:erpc, :noconnection} -> false - type, reason -> :erlang.raise(type, reason, __STACKTRACE__) + defp transport(registry) do + case :ets.lookup(registry_ets_table(registry), :transport) do + [{:transport, mod}] -> mod + _ -> Horde.ClusterTransport.Erlang + end end defp member_in_cluster?(registry, member) do @@ -440,6 +443,7 @@ defmodule Horde.Registry do end defp members(:auto, _name), do: :auto + defp members({:auto, _listener}, _name), do: :auto defp members(options, name) do if name in options do diff --git a/mix.exs b/mix.exs index ec5c3d9..98a9b7d 100644 --- a/mix.exs +++ b/mix.exs @@ -14,7 +14,7 @@ defmodule Horde.MixProject do docs: docs(), package: package(), name: "Horde", - source_url: "https://github.com/derekkraan/horde", + source_url: "https://github.com/elixir-horde/horde", aliases: [ test: "test --no-start" ]