Skip to content

Commit 4ae3501

Browse files
committed
Using CRDTs for the processes only with Horde
1 parent a766c5a commit 4ae3501

File tree

8 files changed

+231
-24
lines changed

8 files changed

+231
-24
lines changed

lib/lightning/application.ex

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ defmodule Lightning.Application do
105105
[
106106
Lightning.PromEx,
107107
{Cluster.Supervisor, [topologies, [name: Lightning.ClusterSupervisor]]},
108+
{Horde.Registry,
109+
name: Lightning.HordeRegistry, keys: :unique, members: :auto},
110+
{Horde.DynamicSupervisor,
111+
name: Lightning.DistributedSupervisor,
112+
strategy: :one_for_one,
113+
members: :auto},
108114
{Lightning.Vault, Application.get_env(:lightning, Lightning.Vault, [])},
109115
# Start the Ecto repository
110116
Lightning.Repo,
@@ -174,6 +180,15 @@ defmodule Lightning.Application do
174180
:ok
175181
end
176182

183+
def start_phase(:init_rate_limiter, :normal, _args) do
184+
Horde.DynamicSupervisor.start_child(
185+
Lightning.DistributedSupervisor,
186+
Lightning.WebhookRateLimiter
187+
)
188+
189+
:ok
190+
end
191+
177192
def oban_opts do
178193
opts = Application.get_env(:lightning, Oban)
179194

lib/lightning/rate_limiters.ex

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,6 @@ defmodule Lightning.RateLimiters do
2222
| {:deny, non_neg_integer()}
2323
end
2424

25-
defmodule Webhook do
26-
@moduledoc false
27-
28-
use ReplicatedRateLimiter,
29-
default_capacity: 10,
30-
default_refill: 2
31-
end
32-
3325
@spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result()
3426
def hit({:failure_email, workflow_id, user_id}) do
3527
[time_scale: time_scale, rate_limit: rate_limit] =
@@ -45,15 +37,6 @@ defmodule Lightning.RateLimiters do
4537
end
4638
end
4739

48-
def hit({:webhook, project_id}) do
49-
# 10 requests for a second, then 2 requests per second
50-
# Over a long enough period of time, this will allow 2 requests per second.
51-
# allow?("webhook_#{project_id}", 10, 2)
52-
# capacity and refill is by design a module attribute
53-
# TODO: passing it here might eliminate the need for macro for easier maintainance
54-
Webhook.allow?("webhook_#{project_id}")
55-
end
56-
5740
def child_spec(opts) do
5841
%{
5942
id: __MODULE__,
@@ -63,7 +46,7 @@ defmodule Lightning.RateLimiters do
6346
end
6447

6548
def start_link(opts) do
66-
children = [{Mail, opts}, {Webhook, opts}]
49+
children = [{Mail, opts}]
6750
Supervisor.start_link(children, strategy: :one_for_one)
6851
end
6952
end
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
defmodule Lightning.WebhookRateLimiter do
2+
@moduledoc false
3+
use GenServer
4+
5+
@capacity 10
6+
@refill_per_sec 2
7+
8+
require Logger
9+
10+
def child_spec(opts) do
11+
{id, name} =
12+
if name = Keyword.get(opts, :name) do
13+
{"#{__MODULE__}_#{name}", name}
14+
else
15+
{__MODULE__, __MODULE__}
16+
end
17+
18+
%{
19+
id: id,
20+
start: {__MODULE__, :start_link, [name]},
21+
shutdown: 10_000,
22+
restart: :transient
23+
}
24+
end
25+
26+
def start_link(name) do
27+
with {:error, {:already_started, pid}} <-
28+
GenServer.start_link(__MODULE__, [], name: via_tuple(name)) do
29+
Logger.info("already started at #{inspect(pid)}, returning :ignore")
30+
:ignore
31+
end
32+
end
33+
34+
@impl true
35+
def init([]) do
36+
Process.flag(:trap_exit, true)
37+
38+
{:ok, %{table: :ets.new(:table, [:set])}}
39+
end
40+
41+
def check_rate(bucket, cost \\ 1, name \\ __MODULE__) do
42+
name
43+
|> via_tuple()
44+
|> GenServer.call({:check_rate, bucket, cost})
45+
end
46+
47+
def inspect_table(name \\ __MODULE__),
48+
do: GenServer.call(via_tuple(name), :inspect_table)
49+
50+
@impl true
51+
def handle_call({:check_rate, bucket, cost}, _from, %{table: table} = state) do
52+
{:reply, do_check_rate(table, bucket, cost), state}
53+
end
54+
55+
@impl true
56+
def handle_call(:inspect_table, _from, %{table: table} = state) do
57+
{:reply, :ets.info(table), state}
58+
end
59+
60+
@impl true
61+
def handle_info(
62+
{:EXIT, _from, {:name_conflict, {_key, _value}, registry, pid}},
63+
state
64+
) do
65+
Logger.info(
66+
"Stopping #{inspect({registry, pid})} as it has already started in another node."
67+
)
68+
69+
{:stop, :normal, state}
70+
end
71+
72+
def do_check_rate(table, bucket, cost) do
73+
now = System.monotonic_time(:millisecond)
74+
75+
:ets.insert_new(table, {bucket, {@capacity, now}})
76+
[{^bucket, {level, updated}}] = :ets.lookup(table, bucket)
77+
78+
refilled = div((now - updated), 1_000) * @refill_per_sec
79+
current = min(@capacity, level + refilled)
80+
81+
if current >= cost do
82+
level = current - cost
83+
:ets.insert(table, {bucket, {level, now}})
84+
85+
{:allow, level}
86+
else
87+
wait_ms = ceil((cost - current) / @refill_per_sec * 1_000) |> round()
88+
89+
{:deny, wait_ms}
90+
end
91+
end
92+
93+
def capacity, do: @capacity
94+
def refill_per_second, do: @refill_per_sec
95+
96+
def via_tuple(name),
97+
do: {:via, Horde.Registry, {Lightning.HordeRegistry, name}}
98+
end

mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ defmodule Lightning.MixProject do
5555
[
5656
mod: {Lightning.Application, [:timex]},
5757
extra_applications: [:logger, :runtime_tools, :os_mon, :scrivener],
58-
start_phases: [seed_prom_ex_telemetry: []]
58+
start_phases: [seed_prom_ex_telemetry: [], init_rate_limiter: []]
5959
]
6060
end
6161

@@ -69,6 +69,7 @@ defmodule Lightning.MixProject do
6969
defp deps do
7070
[
7171
# {:rexbug, ">= 1.0.0", only: :test},
72+
{:horde, "~> 0.9.0"},
7273
{:bcrypt_elixir, "~> 3.2"},
7374
{:bodyguard, "~> 2.2"},
7475
{:broadway_kafka, "~> 0.4.2"},

mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
"hammer": {:hex, :hammer, "7.0.1", "136edcd81af44becbe6b73a958c109e2364ab0dc026d7b19892037dc2632078c", [:mix], [], "hexpm", "796edf14ab2aa80df72080210fcf944ee5e8868d8ece7a7511264d802f58cc2d"},
6262
"hammer_backend_mnesia": {:hex, :hammer_backend_mnesia, "0.7.0", "b2a8cccc1d3506bc4cf8e95750fa5bd491390f227e9a0d981ad375291d7bd1dc", [:mix], [{:hammer, "~> 7.0", [hex: :hammer, repo: "hexpm", optional: false]}], "hexpm", "f77a3d54df865aa8137926df6fadac2a81c06f1c1c22f4e98e32392a22bf9e3e"},
6363
"heroicons": {:hex, :heroicons, "0.5.6", "95d730e7179c633df32d95c1fdaaecdf81b0da11010b89b737b843ac176a7eb5", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.2", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "ca267f02a5fa695a4178a737b649fb6644a2e399639d4ba7964c18e8a58c2352"},
64+
"horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"},
6465
"hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"},
6566
"httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"},
6667
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
@@ -72,6 +73,7 @@
7273
"junit_formatter": {:hex, :junit_formatter, "3.4.0", "d0e8db6c34dab6d3c4154c3b46b21540db1109ae709d6cf99ba7e7a2ce4b1ac2", [:mix], [], "hexpm", "bb36e2ae83f1ced6ab931c4ce51dd3dbef1ef61bb4932412e173b0cfa259dacd"},
7374
"kafka_protocol": {:hex, :kafka_protocol, "4.1.9", "7c10d9adaba84c6f176f152e6ba8029c46dfb7cb12432587009128836cf9a44a", [:rebar3], [{:crc32cer, "0.1.11", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "14f89eed8329ff4c7b5448e318ee20a98bf5c1e5dc41b74b8af459dfb7590cef"},
7475
"libcluster": {:hex, :libcluster, "3.4.1", "271d2da892763bbef53c2872036c936fe8b80111eb1feefb2d30a3bb15c9b4f6", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1d568157f069c6afa70ec0d736704cf799734bdbb6343f0322af4a980301c853"},
76+
"libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"},
7577
"makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"},
7678
"makeup_eex": {:hex, :makeup_eex, "0.1.2", "93a5ef3d28ed753215dba2d59cb40408b37cccb4a8205e53ef9b5319a992b700", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.16 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_html, "~> 0.1.0 or ~> 1.0", [hex: :makeup_html, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "6140eafb28215ad7182282fd21d9aa6dcffbfbe0eb876283bc6b768a6c57b0c3"},
7779
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
defmodule Lightning.WebhookRateLimiterTest do
2+
@moduledoc false
3+
use ExUnit.Case
4+
5+
alias Lightning.WebhookRateLimiter
6+
7+
@default_capacity 10
8+
9+
describe "check_rate/2" do
10+
test "allows up to the capacity and refills on multiple buckets" do
11+
initial_capacity = @default_capacity
12+
bucket1 = "project#{System.unique_integer()}"
13+
bucket2 = "project#{System.unique_integer()}"
14+
15+
Enum.each(1..initial_capacity, fn i ->
16+
level = initial_capacity - i
17+
assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket1))
18+
assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket2))
19+
end)
20+
end
21+
22+
test "denies after consuming the bucket" do
23+
initial_capacity = @default_capacity
24+
bucket1 = "project#{System.unique_integer()}"
25+
bucket2 = "project#{System.unique_integer()}"
26+
27+
Enum.each(1..initial_capacity, fn i ->
28+
assert {:allow, level} = WebhookRateLimiter.check_rate(bucket1)
29+
assert level == initial_capacity - i
30+
end)
31+
32+
assert {:allow, level} = WebhookRateLimiter.check_rate(bucket2)
33+
assert level == initial_capacity - 1
34+
35+
assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket1)
36+
assert 0 < wait_ms and wait_ms < 1_000
37+
end
38+
39+
# Synthetic cluster not working.
40+
# For testing use manual procedure:
41+
# 0. Disable Endpoint server
42+
# 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server
43+
# 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server
44+
# 3. Call Lightning.WebhookRateLimiter.inspect_table() on both iex and they show the same ets table process and node.
45+
@tag skip: true
46+
test "consumes the bucket remotely" do
47+
{:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost")
48+
49+
:rpc.call(node2, Application, :ensure_all_started, [:mix])
50+
:rpc.call(node2, Application, :ensure_all_started, [:lightning])
51+
52+
# Copy current code paths to the peer node
53+
:rpc.call(node2, :code, :add_paths, [:code.get_path()])
54+
55+
assert [
56+
{Lightning.DistributedSupervisor, :node1@localhost},
57+
{Lightning.DistributedSupervisor, :node2@localhost}
58+
] = Horde.Cluster.members(Lightning.DistributedSupervisor)
59+
60+
# initial_capacity = @default_capacity
61+
bucket = "project#{System.unique_integer()}"
62+
63+
dbg(WebhookRateLimiter.check_rate(bucket))
64+
65+
# dbg :rpc.block_call(node1, WebhookRateLimiter, :inspect, [WebhookRateLimiter])
66+
# dbg :rpc.block_call(node2, WebhookRateLimiter, :inspect, [WebhookRateLimiter])
67+
68+
# Enum.each(1..initial_capacity-1, fn i ->
69+
# assert {:allow, level} = :rpc.call(node2, WebhookRateLimiter, :check_rate, [bucket, 1])
70+
# assert level == initial_capacity - i - 1
71+
# end)
72+
73+
# assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket)
74+
# assert 0 < wait_ms and wait_ms < 1_000
75+
76+
:peer.stop(peer)
77+
end
78+
end
79+
80+
defp start_nodes(node1, node2, host) do
81+
# Start the main node
82+
node1_sname = :"#{node1}@#{host}"
83+
{:ok, _pid} = Node.start(node1_sname, :shortnames)
84+
true = Node.set_cookie(:delicious_cookie)
85+
cookie = Node.get_cookie() |> to_charlist()
86+
87+
# Start the peer node
88+
{:ok, peer, node2_sname} =
89+
:peer.start(%{
90+
name: node2,
91+
host: host,
92+
cookie: cookie,
93+
args: [~c"-setcookie", cookie]
94+
})
95+
96+
assert node2_sname in Node.list()
97+
98+
{:ok, peer, node1_sname, node2_sname}
99+
end
100+
end

test/replicated_rate_limiter_test.exs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ defmodule ReplicatedRateLimiterTest do
2626
Enum.each(1..9, fn _ -> CrdtEts.allow?("project2") end)
2727

2828
assert_eventually(
29-
DeltaCrdt.get(config.crdt_name, {"project2", "#{Node.self()}"}) |> dbg |> elem(0) == 0,
29+
DeltaCrdt.get(config.crdt_name, {"project2", "#{Node.self()}"})
30+
|> dbg
31+
|> elem(0) == 0,
3032
1_000
3133
)
3234

@@ -58,13 +60,12 @@ defmodule ReplicatedRateLimiterTest do
5860
DeltaCrdt.put(
5961
test_crdt,
6062
{"project2", "another_node"},
61-
{10-i, System.system_time(:second)})
63+
{10 - i, System.system_time(:second)}
64+
)
6265
end)
6366

6467
# Wait for the bucket to be updated
65-
assert_eventually(
66-
CrdtEts.to_list("project2") |> elem(0) == 1
67-
)
68+
assert_eventually(CrdtEts.to_list("project2") |> elem(0) == 1)
6869

6970
assert {:allow, 4} = CrdtEts.allow?("project2", 10, 2)
7071
end

test/test_helper.exs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,12 @@ Application.put_env(:lightning, Lightning.Extensions,
5454
external_metrics: Lightning.Extensions.ExternalMetrics
5555
)
5656

57+
epmd_path = System.find_executable("epmd")
58+
port = Port.open({:spawn_executable, epmd_path}, [])
59+
os_pid = Keyword.get(Port.info(port), :os_pid)
60+
61+
# Configuring a "shutdown hook" to stop epmd after everything is done.
62+
System.at_exit(fn _ -> System.shell("kill -TERM #{os_pid}") end)
63+
5764
ExUnit.start()
5865
Ecto.Adapters.SQL.Sandbox.mode(Lightning.Repo, :manual)

0 commit comments

Comments
 (0)