Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions lib/horde/cluster_transport.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Horde.ClusterTransport do
@moduledoc """
Behaviour for cluster communication primitives.

Allows Horde to work with both standard Erlang distribution and
alternative transports like Partisan.

Configure per registry/supervisor via the `:transport` option:

{Horde.Registry, name: MyRegistry, keys: :unique,
transport: Horde.ClusterTransport.Partisan}
"""

@doc "Returns all connected peer nodes."
@callback members() :: [node()]

@doc "Returns true if the given pid is alive on its node."
@callback process_alive?(pid()) :: boolean()

@doc "Call a function on a remote node."
@callback call(node(), module(), atom(), [term()], timeout()) :: term()
end
26 changes: 26 additions & 0 deletions lib/horde/cluster_transport/erlang.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Horde.ClusterTransport.Erlang do
@moduledoc "Cluster transport using standard Erlang distribution."

@behaviour Horde.ClusterTransport

@impl true
def members(), do: Node.list()

@impl true
def process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid)

def process_alive?(pid) do
n = node(pid)

Node.list() |> Enum.member?(n) &&
:erpc.call(n, Process, :alive?, [pid])
catch
:error, {:erpc, :noconnection} -> false
type, reason -> :erlang.raise(type, reason, __STACKTRACE__)
end

@impl true
def call(node, mod, fun, args, timeout) do
:erpc.call(node, mod, fun, args, timeout)
end
end
53 changes: 53 additions & 0 deletions lib/horde/cluster_transport/partisan.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
if Code.ensure_loaded?(:partisan_peer_service) do
defmodule Horde.ClusterTransport.Partisan do
@moduledoc "Cluster transport using Partisan for peer communication."

@behaviour Horde.ClusterTransport

@impl true
def members() do
case :partisan_peer_service.members() do
members when is_list(members) ->
members
|> Enum.map(& &1.name)
|> Enum.reject(&(&1 == node()))

_ ->
[]
end
rescue
_ -> []
end

@impl true
def process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid)

def process_alive?(pid) do
n = node(pid)

if peer?(n) do
try do
:partisan_rpc.call(n, Process, :alive?, [pid], 5_000)
catch
_, _ -> false
end
else
false
end
end

@impl true
def call(node, mod, fun, args, timeout) do
:partisan_rpc.call(node, mod, fun, args, timeout)
end

defp peer?(node) do
case :partisan_peer_service.members() do
members when is_list(members) -> Enum.any?(members, &(&1.name == node))
_ -> false
end
rescue
_ -> false
end
end
end
4 changes: 4 additions & 0 deletions lib/horde/dynamic_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ defmodule Horde.DynamicSupervisor do
defp maybe_add_node_manager(children, :auto, name),
do: children ++ [{Horde.NodeListener, name}]

defp maybe_add_node_manager(children, {:auto, listener}, name),
do: children ++ [{listener, name}]

defp maybe_add_node_manager(children, _, _), do: children

defp delta_crdt_options(options) do
Expand All @@ -299,6 +302,7 @@ defmodule Horde.DynamicSupervisor do
end

defp members(:auto, _name), do: :auto
defp members({:auto, _listener}, _name), do: :auto

defp members(options, name) do
if name in options do
Expand Down
11 changes: 0 additions & 11 deletions lib/horde/node_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Horde.NodeListener do
do: Enum.map(nodes(), fn node -> {cluster, node} end)

# GenServer callbacks

def init(cluster) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, cluster}
Expand All @@ -30,16 +29,6 @@ defmodule Horde.NodeListener do
{:noreply, cluster}
end

def handle_info({:nodeup, _node, _node_type}, cluster) do
set_members(cluster)
{:noreply, cluster}
end

def handle_info({:nodedown, _node, _node_type}, cluster) do
set_members(cluster)
{:noreply, cluster}
end

def handle_info(_, cluster), do: {:noreply, cluster}

# Helper functions
Expand Down
54 changes: 54 additions & 0 deletions lib/horde/node_listener/partisan.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Horde.NodeListener.Partisan do
@moduledoc """
A Horde node listener for Partisan-based clusters.

Uses `:partisan.monitor_nodes/1` for node monitoring and
`:partisan_peer_service` for membership, replacing the default
Erlang distribution assumptions in `Horde.NodeListenerBehaviour`.

## Usage

{Horde.Registry,
name: MyRegistry,
keys: :unique,
members: {:auto, Horde.NodeListener.Partisan}}
"""

use Horde.NodeListenerBehaviour

@impl GenServer
def init(cluster) do
:partisan.monitor_nodes(true)
{:ok, cluster}
end

@impl Horde.NodeListenerBehaviour
def make_members(cluster) do
case :partisan_peer_service.members() do
members when is_list(members) ->
Enum.map(members, fn peer -> {cluster, peer.name} end)

_ ->
[{cluster, :partisan_config.get(:name)}]
end
end

@impl Horde.NodeListenerBehaviour
def handle_nodeup(_node, cluster), do: set_members(cluster)

@impl Horde.NodeListenerBehaviour
def handle_nodedown(_node, cluster), do: set_members(cluster)

# Partisan emits 2-tuple {:nodeup, node} / {:nodedown, node} — no node_type
@impl GenServer
def handle_info({:nodeup, node}, cluster) do
handle_nodeup(node, cluster)
{:noreply, cluster}
end

@impl GenServer
def handle_info({:nodedown, node}, cluster) do
handle_nodedown(node, cluster)
{:noreply, cluster}
end
end
98 changes: 98 additions & 0 deletions lib/horde/node_listener_behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
defmodule Horde.NodeListenerBehaviour do
@moduledoc """
A behaviour for cluster membership managers.

Use this module to build a custom node listener with sensible defaults:

defmodule MyNodeListener do
use Horde.NodeListenerBehaviour

@impl Horde.NodeListenerBehaviour
def make_members(cluster),
do: Enum.map(Node.list([:visible, :this]), &{cluster, &1})
end

The `use` macro injects default implementations of all callbacks. The only
required callback to override is `make_members/1`. All others are overridable.
"""

@doc "Returns the member list for the given cluster."
@callback make_members(cluster :: atom()) :: [{atom(), node()}]

@doc "Called when a node comes up."
@callback handle_nodeup(node :: node(), cluster :: atom()) :: atom()

@doc "Called when a node goes down."
@callback handle_nodedown(node :: node(), cluster :: atom()) :: atom()

@optional_callbacks handle_nodeup: 2, handle_nodedown: 2

defmacro __using__(_opts) do
quote do
@behaviour Horde.NodeListenerBehaviour

use GenServer

# --- API ---

@spec start_link(atom()) :: GenServer.on_start()
def start_link(cluster),
do: GenServer.start_link(__MODULE__, cluster, name: listener_name(cluster))

# --- Behaviour defaults ---

@impl GenServer
def init(cluster) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, cluster}
end

@impl Horde.NodeListenerBehaviour
def handle_nodeup(_node, cluster) do
set_members(cluster)
end

@impl Horde.NodeListenerBehaviour
def handle_nodedown(_node, cluster) do
set_members(cluster)
end

# --- GenServer callbacks ---

@impl GenServer
def handle_cast(:initial_set, cluster) do
set_members(cluster)
{:noreply, cluster}
end

@impl GenServer
def handle_info({:nodeup, node, _node_type}, cluster) do
handle_nodeup(node, cluster)
{:noreply, cluster}
end

@impl GenServer
def handle_info({:nodedown, node, _node_type}, cluster) do
handle_nodedown(node, cluster)
{:noreply, cluster}
end

@impl GenServer
def handle_info(_, cluster), do: {:noreply, cluster}

# --- Helpers ---

defp listener_name(cluster), do: Module.concat(cluster, NodeListener)

defp set_members(cluster),
do: :ok = Horde.Cluster.set_members(cluster, make_members(cluster))

defoverridable start_link: 1,
init: 1,
handle_nodeup: 2,
handle_nodedown: 2,
handle_cast: 2,
handle_info: 2
end
end
end
32 changes: 18 additions & 14 deletions lib/horde/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ defmodule Horde.Registry do
:keys,
:distribution_strategy,
:members,
:delta_crdt_options
:delta_crdt_options,
:transport
]

{sup_options, start_options} = Keyword.split(options, keys)
Expand Down Expand Up @@ -132,6 +133,7 @@ defmodule Horde.Registry do
meta = Keyword.get(options, :meta, nil)
members = Keyword.get(options, :members, [])
delta_crdt_options = Keyword.get(options, :delta_crdt_options, [])
transport = Keyword.get(options, :transport, Horde.ClusterTransport.Erlang)

distribution_strategy =
Keyword.get(
Expand All @@ -146,7 +148,8 @@ defmodule Horde.Registry do
keys: keys,
distribution_strategy: distribution_strategy,
members: members,
delta_crdt_options: delta_crdt_options(delta_crdt_options)
delta_crdt_options: delta_crdt_options(delta_crdt_options),
transport: transport
}

{:ok, flags}
Expand All @@ -169,7 +172,7 @@ defmodule Horde.Registry do
[
name: name,
listeners: flags.listeners,
meta: flags.meta,
meta: [{:transport, flags.transport} | (flags.meta || [])],
keys: flags.keys,
members: members(flags.members, name)
]}
Expand Down Expand Up @@ -248,9 +251,11 @@ defmodule Horde.Registry do
def lookup({:via, _, {registry, name}}), do: lookup(registry, name)

def lookup(registry, key) when is_atom(registry) do
transport = transport(registry)

with [{^key, member, {pid, value}}] <- :ets.lookup(keys_ets_table(registry), key),
true <- member_in_cluster?(registry, member),
true <- process_alive?(pid) do
true <- transport.process_alive?(pid) do
[{pid, value}]
else
_ -> []
Expand Down Expand Up @@ -410,18 +415,16 @@ defmodule Horde.Registry do
defp maybe_add_node_manager(children, :auto, name),
do: children ++ [{Horde.NodeListener, name}]

defp maybe_add_node_manager(children, _, _), do: children

defp process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid)
defp maybe_add_node_manager(children, {:auto, listener}, name),
do: children ++ [{listener, name}]

defp process_alive?(pid) do
n = node(pid)
defp maybe_add_node_manager(children, _, _), do: children

Node.list() |> Enum.member?(n) &&
:erpc.call(n, Process, :alive?, [pid])
catch
:error, {:erpc, :noconnection} -> false
type, reason -> :erlang.raise(type, reason, __STACKTRACE__)
defp transport(registry) do
case :ets.lookup(registry_ets_table(registry), :transport) do
[{:transport, mod}] -> mod
_ -> Horde.ClusterTransport.Erlang
end
end

defp member_in_cluster?(registry, member) do
Expand All @@ -440,6 +443,7 @@ defmodule Horde.Registry do
end

defp members(:auto, _name), do: :auto
defp members({:auto, _listener}, _name), do: :auto

defp members(options, name) do
if name in options do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Horde.MixProject do
docs: docs(),
package: package(),
name: "Horde",
source_url: "https://github.com/derekkraan/horde",
source_url: "https://github.com/elixir-horde/horde",
aliases: [
test: "test --no-start"
]
Expand Down