Skip to content

Commit 0ba3cfe

Browse files
committed
ExHashRing for distributing the keys across the cluster members
1 parent 63f4d0c commit 0ba3cfe

File tree

14 files changed

+301
-311
lines changed

14 files changed

+301
-311
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,20 @@
1212
- [Nebulex.Adapters.Partitioned] The adapter implements the new Nebulex v3 API.
1313
- [Nebulex.Adapters.Partitioned] The adapter supports the `:timeout` option from
1414
`Nebulex.Cache`.
15+
- [Nebulex.Adapters.Partitioned] The adapter uses `ExHashRing` for distributing
16+
the keys across the cluster members.
17+
- [Nebulex.Adapters.Partitioned] he adapter supports the `:hash_ring` option to
18+
configute `ExHashRing`.
1519

1620
### Backwards incompatible changes
1721

1822
- [Nebulex.Adapters.Multilevel] Option `:model` is no longer supported. Please
1923
use the option `:inclusion_policy` instead.
2024
- [Nebulex.Adapters.Multilevel] The previous extended function `model/0,1` has
2125
been removed. Please use `inclusion_policy/0,1` instead.
26+
- [Nebulex.Adapters.Partitioned] The option `:keyslot` is no longer supported,
27+
so the `Nebulex.Adapter.Keyslot` behaviour has been removed. The partitioned
28+
adapter uses `ExHashRing` for key distribution under-the-hood.
2229
- [Nebulex.Distributed.RPC] The usage of async tasks for the RPC calls has been
2330
removed (for OTP < 23). The new `Nebulex.Distributed.RPC` module uses `:erpc`.
2431

lib/nebulex/adapter/keyslot.ex

Lines changed: 0 additions & 51 deletions
This file was deleted.

lib/nebulex/adapters/multilevel.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ defmodule Nebulex.Adapters.Multilevel do
561561
|> levels(ml_opts)
562562
|> Enum.reduce([node()], fn %{name: name, cache: cache}, acc ->
563563
if cache.__adapter__() in [Nebulex.Adapters.Partitioned, Nebulex.Adapters.Replicated] do
564-
Cluster.get_nodes(name || cache) ++ acc
564+
Cluster.pg_nodes(name || cache) ++ acc
565565
else
566566
acc
567567
end

lib/nebulex/adapters/partitioned.ex

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule Nebulex.Adapters.Partitioned do
66
77
* Partitioned cache topology (Sharding Distribution Model).
88
* Configurable primary storage adapter.
9-
* Configurable Keyslot to distributed the keys across the cluster members.
9+
* `ExHashRing` for distributing the keys across the cluster members.
1010
* Support for transactions via Erlang global name registration facility.
1111
1212
## Partitioned Cache Topology
@@ -102,29 +102,10 @@ defmodule Nebulex.Adapters.Partitioned do
102102
primary_storage_adapter: Nebulex.Adapters.Local
103103
end
104104
105-
Also, you can provide a custom keyslot function:
106-
107-
defmodule MyApp.PartitionedCache do
108-
use Nebulex.Cache,
109-
otp_app: :my_app,
110-
adapter: Nebulex.Adapters.Partitioned,
111-
primary_storage_adapter: Nebulex.Adapters.Local
112-
113-
@behaviour Nebulex.Adapter.Keyslot
114-
115-
@impl true
116-
def hash_slot(key, range) do
117-
key
118-
|> :erlang.phash2()
119-
|> :jchash.compute(range)
120-
end
121-
end
122-
123105
Where the configuration for the cache must be in your application environment,
124106
usually defined in your `config/config.exs`:
125107
126108
config :my_app, MyApp.PartitionedCache,
127-
keyslot: MyApp.PartitionedCache,
128109
primary: [
129110
gc_interval: 3_600_000,
130111
backend: :shards
@@ -315,7 +296,7 @@ defmodule Nebulex.Adapters.Partitioned do
315296
316297
Get a cluster node based on the given `key`:
317298
318-
MyCache.get_node("mykey")
299+
MyCache.find_node("mykey")
319300
320301
Joining the cache to the cluster:
321302
@@ -348,9 +329,6 @@ defmodule Nebulex.Adapters.Partitioned do
348329
# Inherit default observable implementation
349330
use Nebulex.Adapter.Observable
350331

351-
# Inherit default keyslot implementation
352-
use Nebulex.Adapter.Keyslot
353-
354332
import Nebulex.Adapter
355333
import Nebulex.Utils
356334

@@ -388,14 +366,30 @@ defmodule Nebulex.Adapters.Partitioned do
388366
A convenience function for getting the cluster nodes.
389367
"""
390368
def nodes(name \\ get_dynamic_cache()) do
391-
Cluster.get_nodes(name)
369+
name
370+
|> lookup_meta()
371+
|> get_in([:hash_ring, :name])
372+
|> Cluster.ring_nodes()
392373
end
393374

394375
@doc """
395376
A convenience function to get the node of the given `key`.
396377
"""
397-
def get_node(name \\ get_dynamic_cache(), key) do
398-
Cluster.get_node(name, key, lookup_meta(name).keyslot)
378+
def find_node(name \\ get_dynamic_cache(), key) do
379+
name
380+
|> lookup_meta()
381+
|> get_in([:hash_ring, :name])
382+
|> Cluster.find_node(key)
383+
end
384+
385+
@doc """
386+
Same as `find_node/2` but raises an error if an error occurs.
387+
"""
388+
def find_node!(name \\ get_dynamic_cache(), key) do
389+
case find_node(name, key) do
390+
{:ok, node} -> node
391+
{:error, _} = error -> raise error
392+
end
399393
end
400394

401395
@doc """
@@ -440,16 +434,19 @@ defmodule Nebulex.Adapters.Partitioned do
440434
do: [name: camelize_and_concat([name, Primary])] ++ primary_opts,
441435
else: primary_opts
442436

443-
# Keyslot module for selecting nodes
444-
keyslot = Keyword.fetch!(opts, :keyslot)
437+
# Hash ring options
438+
hash_ring =
439+
opts
440+
|> Keyword.fetch!(:hash_ring)
441+
|> Keyword.put_new_lazy(:name, fn -> camelize_and_concat([name, Ring]) end)
445442

446443
# Prepare metadata
447444
adapter_meta = %{
448445
telemetry_prefix: telemetry_prefix,
449446
telemetry: telemetry,
450447
name: name,
451448
primary_name: primary_opts[:name],
452-
keyslot: keyslot
449+
hash_ring: hash_ring
453450
}
454451

455452
# Prepare child spec
@@ -525,6 +522,9 @@ defmodule Nebulex.Adapters.Partitioned do
525522
end
526523

527524
case map_reduce(entries, adapter_meta, action, [opts], timeout, {true, []}, reducer) do
525+
{:error, _} = error ->
526+
error
527+
528528
{true, _} ->
529529
{:ok, true}
530530

@@ -638,12 +638,13 @@ defmodule Nebulex.Adapters.Partitioned do
638638
end
639639
end
640640

641-
defp do_execute(adapter_meta, %{op: op} = query, opts) do
641+
defp do_execute(%{hash_ring: hash_ring} = adapter_meta, %{op: op} = query, opts) do
642+
ring = Keyword.fetch!(hash_ring, :name)
642643
timeout = Keyword.fetch!(opts, :timeout)
643644
query = build_query(query)
644645

645646
RPC.multicall(
646-
Cluster.get_nodes(adapter_meta.name),
647+
Cluster.ring_nodes(ring),
647648
__MODULE__,
648649
:with_dynamic_cache,
649650
[adapter_meta, op, [query, opts]],
@@ -687,11 +688,13 @@ defmodule Nebulex.Adapters.Partitioned do
687688
## Nebulex.Adapter.Transaction
688689

689690
@impl true
690-
def transaction(adapter_meta, fun, opts) do
691+
def transaction(%{hash_ring: hash_ring} = adapter_meta, fun, opts) do
692+
ring = Keyword.fetch!(hash_ring, :name)
693+
691694
opts =
692695
opts
693696
|> Options.validate_common_runtime_opts!()
694-
|> Keyword.put(:nodes, Cluster.get_nodes(adapter_meta.name))
697+
|> Keyword.put(:nodes, Cluster.ring_nodes(ring))
695698

696699
super(adapter_meta, fun, opts)
697700
end
@@ -722,8 +725,10 @@ defmodule Nebulex.Adapters.Partitioned do
722725
super(adapter_meta, :server, opts)
723726
end
724727

725-
def info(adapter_meta, :nodes, _opts) do
726-
{:ok, Cluster.get_nodes(adapter_meta.name)}
728+
def info(%{hash_ring: hash_ring}, :nodes, _opts) do
729+
ring = Keyword.fetch!(hash_ring, :name)
730+
731+
{:ok, Cluster.ring_nodes(ring)}
727732
end
728733

729734
def info(adapter_meta, :nodes_info, opts) do
@@ -778,11 +783,12 @@ defmodule Nebulex.Adapters.Partitioned do
778783
end
779784
end
780785

781-
defp fetch_nodes_info(adapter_meta, spec, opts) do
786+
defp fetch_nodes_info(%{hash_ring: hash_ring} = adapter_meta, spec, opts) do
782787
opts = Options.validate_common_runtime_opts!(opts)
788+
ring = Keyword.fetch!(hash_ring, :name)
783789

784790
RPC.multicall(
785-
Cluster.get_nodes(adapter_meta.name),
791+
Cluster.ring_nodes(ring),
786792
__MODULE__,
787793
:with_dynamic_cache,
788794
[adapter_meta, :info, [spec, opts]],
@@ -839,16 +845,18 @@ defmodule Nebulex.Adapters.Partitioned do
839845
end
840846
end
841847

842-
defp get_node(%{name: name, keyslot: keyslot}, key) do
843-
Cluster.get_node(name, key, keyslot)
848+
defp find_node(%{hash_ring: hash_ring}, key) do
849+
hash_ring
850+
|> Keyword.fetch!(:name)
851+
|> Cluster.find_node(key)
844852
end
845853

846854
defp call(adapter_meta, key, action, args, opts) do
847855
timeout = Keyword.fetch!(opts, :timeout)
848856

849-
adapter_meta
850-
|> get_node(key)
851-
|> RPC.call(__MODULE__, :with_dynamic_cache, [adapter_meta, action, args], timeout)
857+
with {:ok, node} <- find_node(adapter_meta, key) do
858+
RPC.call(node, __MODULE__, :with_dynamic_cache, [adapter_meta, action, args], timeout)
859+
end
852860
end
853861

854862
defp map_reduce(enum, meta, action, args, timeout, acc, reducer, group_fun \\ & &1) do
@@ -858,14 +866,26 @@ defmodule Nebulex.Adapters.Partitioned do
858866
{node, {__MODULE__, :with_dynamic_cache, [meta, action, [group_fun.(group) | args]]}}
859867
end)
860868
|> RPC.multi_mfa_call(timeout, acc, reducer)
869+
catch
870+
error -> error
861871
end
862872

863873
defp group_by_node(enum, adapter_meta, action) when action in [:put_all, :put_new_all] do
864-
Enum.group_by(enum, &get_node(adapter_meta, elem(&1, 0)))
874+
Enum.group_by(enum, fn {key, _} ->
875+
case find_node(adapter_meta, key) do
876+
{:ok, node} -> node
877+
{:error, _} = error -> throw(error)
878+
end
879+
end)
865880
end
866881

867882
defp group_by_node(enum, adapter_meta, _action) do
868-
Enum.group_by(enum, &get_node(adapter_meta, &1))
883+
Enum.group_by(enum, fn key ->
884+
case find_node(adapter_meta, key) do
885+
{:ok, node} -> node
886+
{:error, _} = error -> throw(error)
887+
end
888+
end)
869889
end
870890

871891
defp handle_rpc_multi_call({res, []}, _action, fun) do
@@ -879,9 +899,11 @@ defmodule Nebulex.Adapters.Partitioned do
879899
action: action
880900
end
881901

902+
## Error formatting
903+
882904
@doc false
883-
def format_error({:rpc, {:unexpected_errors, errors}}, opts) do
884-
action = Keyword.fetch!(opts, :action)
905+
def format_error({:rpc, {:unexpected_errors, errors}}, metadata) do
906+
action = Keyword.fetch!(metadata, :action)
885907

886908
formatted_errors =
887909
Enum.map_join(errors, "\n\n", fn {{:error, reason}, node} ->

0 commit comments

Comments
 (0)