From 490e4443e43fbff5560303a9fe87720afdf7502f Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Mon, 31 Mar 2025 14:02:02 +0200 Subject: [PATCH 01/14] Update Hammer dependencies - Upgraded `hammer_backend_mnesia` to version 0.7.0 and `hammer` to version 7.0. - Removed old Hammer configuration from `config/config.exs` and `config/test.exs`. - Introduced a new `Lightning.RateLimiters` module to handle rate limiting for failure alerts. - Updated `Lightning.FailureAlerter` to utilize the new rate limiting logic. - Adjusted tests to reflect changes in rate limiting behavior. --- config/config.exs | 5 --- config/test.exs | 5 --- lib/lightning/application.ex | 7 ++-- lib/lightning/pipeline/failure_alerter.ex | 44 +++-------------------- lib/lightning/rate_limiters.ex | 39 ++++++++++++++++++++ mix.exs | 4 +-- mix.lock | 4 +-- test/lightning/failure_alert_test.exs | 27 ++------------ 8 files changed, 53 insertions(+), 82 deletions(-) create mode 100644 lib/lightning/rate_limiters.ex diff --git a/config/config.exs b/config/config.exs index 0587dce3ac..1e759875c4 100644 --- a/config/config.exs +++ b/config/config.exs @@ -14,11 +14,6 @@ config :lightning, Lightning.Repo, types: Lightning.PostgrexTypes, log: :debug -config :hammer, - backend: - {Hammer.Backend.Mnesia, - [expiry_ms: 60_000 * 60 * 4, cleanup_interval_ms: 60_000 * 10]} - # Configures the endpoint config :lightning, LightningWeb.Endpoint, url: [host: "localhost"], diff --git a/config/test.exs b/config/test.exs index ea3748ae47..fc2e5ca92b 100644 --- a/config/test.exs +++ b/config/test.exs @@ -95,11 +95,6 @@ config :lightning, Lightning.Mailer, adapter: Swoosh.Adapters.Test config :lightning, Lightning.AdaptorRegistry, use_cache: "test/fixtures/adaptor_registry_cache.json" -config :hammer, - backend: - {Hammer.Backend.ETS, - [expiry_ms: 60_000 * 60 * 4, cleanup_interval_ms: 60_000 * 10]} - config :lightning, Lightning.FailureAlerter, time_scale: 60_000, rate_limit: 3 diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index b8521ded89..b3bf828c7f 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -27,12 +27,11 @@ defmodule Lightning.Application do OpentelemetryEcto.setup([:lightning, :repo]) OpentelemetryLiveView.setup() OpentelemetryOban.setup(trace: [:jobs]) + # mnesia startup :mnesia.stop() :mnesia.create_schema([node()]) :mnesia.start() - Hammer.Backend.Mnesia.create_mnesia_table(disc_copies: [node()]) - :mnesia.wait_for_tables([:__hammer_backend_mnesia], 60_000) # Only add the Sentry backend if a dsn is provided. if Application.get_env(:sentry, :dsn), @@ -133,7 +132,9 @@ defmodule Lightning.Application do {Lightning.Runtime.RuntimeManager, worker_secret: Lightning.Config.worker_secret(), endpoint: LightningWeb.Endpoint}, - {Lightning.KafkaTriggers.Supervisor, type: :supervisor} + {Lightning.KafkaTriggers.Supervisor, type: :supervisor}, + # Start our rate limiter + {Lightning.RateLimiters.Mail, clean_period: :timer.minutes(10)} # Start a worker by calling: Lightning.Worker.start_link(arg) # {Lightning.Worker, arg} ] diff --git a/lib/lightning/pipeline/failure_alerter.ex b/lib/lightning/pipeline/failure_alerter.ex index 455c3fb72a..e48a705930 100644 --- a/lib/lightning/pipeline/failure_alerter.ex +++ b/lib/lightning/pipeline/failure_alerter.ex @@ -46,31 +46,14 @@ defmodule Lightning.FailureAlerter do "run_logs" => run_logs, "recipient" => recipient }) do - [time_scale: time_scale, rate_limit: rate_limit] = - Application.fetch_env!(:lightning, __MODULE__) - - run_url = - url( - LightningWeb.Endpoint, - ~p"/projects/#{project_id}/runs/#{run_id}" - ) + run_url = ~p"/projects/#{project_id}/runs/#{run_id}" work_order_url = - url( - LightningWeb.Endpoint, - ~p"/projects/#{project_id}/history?filters[workorder_id]=#{work_order_id}" - ) - - # rate limiting per workflow AND user - bucket_key = "#{workflow_id}::#{recipient.id}" + ~p"/projects/#{project_id}/history?filters[workorder_id]=#{work_order_id}" - Hammer.check_rate( - bucket_key, - time_scale, - rate_limit - ) + Lightning.RateLimiters.hit({:failure_email, workflow_id, recipient.id}) |> case do - {:allow, count} -> + {:allow, %{count: count, time_scale: time_scale, rate_limit: rate_limit}} -> Lightning.FailureEmail.deliver_failure_email(recipient.email, %{ work_order_id: work_order_id, work_order_url: work_order_url, @@ -85,28 +68,9 @@ defmodule Lightning.FailureAlerter do workflow_id: workflow_id, recipient: recipient }) - |> case do - {:ok, _metadata} -> - nil - - # :ok - - _ -> - # decrement the counter when email is not delivered - Hammer.check_rate_inc( - bucket_key, - time_scale, - rate_limit, - -1 - ) - - nil - # {:cancel, "Failure email was not sent"} or Logger - end {:deny, _} -> nil - # {:cancel, "Failure notification rate limit is reached"} or Logger end end end diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex new file mode 100644 index 0000000000..20adfb103e --- /dev/null +++ b/lib/lightning/rate_limiters.ex @@ -0,0 +1,39 @@ +defmodule Lightning.RateLimiters do + @moduledoc false + + defmodule Mail do + @moduledoc false + + # WARNING: When changing the algorithm, you must also update the mnesia table name. + # The default is to use __MODULE__, passing `:table` to the `use Hammer` macro + # allows you to specify a custom table name. + use Hammer, + backend: Hammer.Mnesia, + algorithm: :leaky_bucket, + table: :mail_limiter + + @type hit_result :: + {:allow, + %{ + count: non_neg_integer(), + time_scale: non_neg_integer(), + rate_limit: non_neg_integer() + }} + | {:deny, non_neg_integer()} + end + + @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() + def hit({:failure_email, workflow_id, user_id}) do + [time_scale: time_scale, rate_limit: rate_limit] = + Application.fetch_env!(:lightning, Lightning.FailureAlerter) + + Mail.hit("#{workflow_id}::#{user_id}", time_scale, rate_limit) + |> case do + {:allow, count} -> + {:allow, %{count: count, time_scale: time_scale, rate_limit: rate_limit}} + + {:deny, count} -> + {:deny, count} + end + end +end diff --git a/mix.exs b/mix.exs index 2d452416cd..f3d216e825 100644 --- a/mix.exs +++ b/mix.exs @@ -140,8 +140,8 @@ defmodule Lightning.MixProject do {:timex, "~> 3.7"}, {:replug, "~> 0.1.0"}, {:phoenix_swoosh, "~> 1.2.1"}, - {:hammer_backend_mnesia, "~> 0.6"}, - {:hammer, "~> 6.0"}, + {:hammer_backend_mnesia, "~> 0.7.0"}, + {:hammer, "~> 7.0"}, {:dotenvy, "~> 0.8.0"}, {:goth, "~> 1.3"}, {:gcs_signed_url, "~> 0.4.6"}, diff --git a/mix.lock b/mix.lock index c9f826bb8c..c45d7433d1 100644 --- a/mix.lock +++ b/mix.lock @@ -61,8 +61,8 @@ "gproc": {:hex, :gproc, "0.8.0", "cea02c578589c61e5341fce149ea36ccef236cc2ecac8691fba408e7ea77ec2f", [:rebar3], [], "hexpm", "580adafa56463b75263ef5a5df4c86af321f68694e7786cb057fd805d1e2a7de"}, "grpcbox": {:hex, :grpcbox, "0.16.0", "b83f37c62d6eeca347b77f9b1ec7e9f62231690cdfeb3a31be07cd4002ba9c82", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.13.0", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.8.0", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "294df743ae20a7e030889f00644001370a4f7ce0121f3bbdaf13cf3169c62913"}, "hackney": {:hex, :hackney, "1.23.0", "55cc09077112bcb4a69e54be46ed9bc55537763a96cd4a80a221663a7eafd767", [:rebar3], [{:certifi, "~> 2.14.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "6cd1c04cd15c81e5a493f167b226a15f0938a84fc8f0736ebe4ddcab65c0b44e"}, - "hammer": {:hex, :hammer, "6.2.1", "5ae9c33e3dceaeb42de0db46bf505bd9c35f259c8defb03390cd7556fea67ee2", [:mix], [{:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}], "hexpm", "b9476d0c13883d2dc0cc72e786bac6ac28911fba7cc2e04b70ce6a6d9c4b2bdc"}, - "hammer_backend_mnesia": {:hex, :hammer_backend_mnesia, "0.6.1", "d10d94fc29cbffbf04ecb3c3127d705ce4cc1cecfb9f3d6b18a554c3cae9af2c", [:mix], [{:hammer, "~> 6.1", [hex: :hammer, repo: "hexpm", optional: false]}], "hexpm", "85ad2ef6ebe035207dd9a03a116dc6a7ee43fbd53e8154cf32a1e33b9200fb62"}, + "hammer": {:hex, :hammer, "7.0.1", "136edcd81af44becbe6b73a958c109e2364ab0dc026d7b19892037dc2632078c", [:mix], [], "hexpm", "796edf14ab2aa80df72080210fcf944ee5e8868d8ece7a7511264d802f58cc2d"}, + "hammer_backend_mnesia": {:hex, :hammer_backend_mnesia, "0.7.0", "b2a8cccc1d3506bc4cf8e95750fa5bd491390f227e9a0d981ad375291d7bd1dc", [:mix], [{:hammer, "~> 7.0", [hex: :hammer, repo: "hexpm", optional: false]}], "hexpm", "f77a3d54df865aa8137926df6fadac2a81c06f1c1c22f4e98e32392a22bf9e3e"}, "heroicons": {:hex, :heroicons, "0.5.6", "95d730e7179c633df32d95c1fdaaecdf81b0da11010b89b737b843ac176a7eb5", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.2", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "ca267f02a5fa695a4178a737b649fb6644a2e399639d4ba7964c18e8a58c2352"}, "hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], [], "hexpm", "06f580167c4b8b8a6429040df36cc93bba6d571faeaec1b28816523379cbb23a"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, diff --git a/test/lightning/failure_alert_test.exs b/test/lightning/failure_alert_test.exs index a2a68bf401..98bec4d9ee 100644 --- a/test/lightning/failure_alert_test.exs +++ b/test/lightning/failure_alert_test.exs @@ -185,6 +185,8 @@ defmodule Lightning.FailureAlertTest do s3 = "\"workflow-a\" has failed 3 times in the last #{period}." assert_receive {:email, %Swoosh.Email{subject: ^s3}}, 1500 + assert Lightning.RateLimiters.Mail.get(run_1.work_order.workflow_id) == 0 + assert_receive {:email, %Swoosh.Email{subject: "\"workflow-b\" failed."}}, 1500 end @@ -230,31 +232,6 @@ defmodule Lightning.FailureAlertTest do refute_email_sent(subject: "\"workflow-a\" failed.") end - test "does not increment the rate-limiter counter when an email is not delivered.", - %{runs: [run, _, _], workorders: [workorder, _, _]} do - [time_scale: time_scale, rate_limit: rate_limit] = - Application.fetch_env!(:lightning, Lightning.FailureAlerter) - - FailureAlerter.alert_on_failure(run) - - {:ok, {0, ^rate_limit, _, _, _}} = - Hammer.inspect_bucket(workorder.workflow_id, time_scale, rate_limit) - - assert_email_sent(subject: "\"workflow-a\" failed.") - - Mimic.stub(Lightning.FailureEmail, :deliver_failure_email, fn _, _ -> - {:error} - end) - - FailureAlerter.alert_on_failure(run) - - refute_email_sent(subject: "\"workflow-a\" failed.") - - # nothing changed - {:ok, {0, ^rate_limit, _, _, _}} = - Hammer.inspect_bucket(workorder.workflow_id, time_scale, rate_limit) - end - test "failure alert is sent on run complete", %{ runs: [run, _, _] } do From 40a1b94db25707603aa8c8dbe35e2a4cc5e16be3 Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Tue, 1 Apr 2025 14:04:27 +0200 Subject: [PATCH 02/14] Add local cluster management script - Introduced a new Bash script `local_cluster` for managing local cluster instances. - The script supports starting multiple instances (1-4) and connecting to specific nodes. - Added functionality for an optional Caddy reverse proxy to manage traffic on port 4000. - Updated `bootstrap.ex` to include RTM port configuration and refactored HTTP configuration handling for production and non-production environments. --- bin/local_cluster | 171 ++++++++++++++++++++++++++++++ lib/lightning/config/bootstrap.ex | 117 ++++++++++++-------- 2 files changed, 244 insertions(+), 44 deletions(-) create mode 100755 bin/local_cluster diff --git a/bin/local_cluster b/bin/local_cluster new file mode 100755 index 0000000000..42c091e776 --- /dev/null +++ b/bin/local_cluster @@ -0,0 +1,171 @@ +#!/usr/bin/env bash + +# Function to show usage +show_usage() { + echo "Usage:" + echo " $0 [--proxy] --count Start local cluster (default: 2 instances)" + echo " $0 connect Connect to a specific node (1-4)" + echo "" + echo "Options:" + echo " --proxy Start a Caddy reverse proxy on port 4000 (nodes will start from 4001)" + echo " --count Number of nodes to start (1-4, default: 2)" + exit 1 +} + +# Handle connect subcommand +if [ "$1" = "connect" ]; then + if [ -z "$2" ] || ! [[ "$2" =~ ^[1-4]$ ]]; then + echo "Error: Please specify a valid node number (1-4)" + show_usage + fi + + NODE_NUM=$2 + echo "Connecting to node${NODE_NUM}@127.0.0.1..." + exec iex --name "remote_shell${NODE_NUM}@127.0.0.1" --remsh "node${NODE_NUM}@127.0.0.1" + # The exec command replaces the current process, so we don't need an explicit exit + # If we reach this point, it means the exec failed, so we'll exit with its status code + exit $? +fi + +# Parse arguments +USE_PROXY=false +INSTANCES=2 + +while [[ $# -gt 0 ]]; do + case $1 in + --proxy) + USE_PROXY=true + shift + ;; + --count) + if [ -z "$2" ] || ! [[ "$2" =~ ^[0-9]+$ ]]; then + echo "Error: --count requires a numeric argument" + show_usage + fi + INSTANCES=$2 + shift 2 + ;; + *) + echo "Unknown argument: $1" + show_usage + ;; + esac +done + +# Validate number of instances +if ! [[ "$INSTANCES" =~ ^[0-9]+$ ]]; then + echo "Error: Number of instances must be a positive integer" + show_usage +fi + +if [ "$INSTANCES" -lt 1 ] || [ "$INSTANCES" -gt 4 ]; then + echo "Error: Number of instances must be between 1 and 4" + show_usage +fi + +# Check for Caddy if proxy is requested +if [ "$USE_PROXY" = true ]; then + if ! command -v caddy &>/dev/null; then + echo "Error: Caddy is required for proxy mode but it's not installed" + echo "Please install Caddy first:" + echo " Mac: brew install caddy" + echo " Linux: sudo apt install caddy" + echo " Or visit: https://caddyserver.com/docs/install" + exit 1 + fi +fi + +# Array to store background PIDs +declare -a PIDS + +# Colors for different processes +declare -a COLORS=( + "\033[0;36m" # Cyan + "\033[0;32m" # Green + "\033[0;35m" # Purple + "\033[0;33m" # Yellow + "\033[0;37m" # Gray (for proxy) +) +RESET="\033[0m" + +# Cleanup function to kill all child processes +cleanup() { + echo "Shutting down all processes..." + for pid in "${PIDS[@]}"; do + kill $pid 2>/dev/null + done + exit 0 +} + +# Set up trap for cleanup +trap cleanup INT TERM + +# Function to run a command with colored output +run_with_color() { + local color=$1 + local prefix=$2 + shift 2 + # Run the command and color its output + "$@" 2>&1 | while read -r line; do + echo -e "${color}${prefix} | ${line}${RESET}" + done +} + +# Create Caddy configuration if proxy is enabled +if [ "$USE_PROXY" = true ]; then + BASE_PORT=4001 + CADDY_CONFIG=$(mktemp) + echo "Creating Caddy configuration..." + cat >"$CADDY_CONFIG" <" + +# Wait for all background processes +wait diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index 15e2368894..f3c3d5e48d 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -114,7 +114,8 @@ defmodule Lightning.Config.Bootstrap do "RTM", &Utils.ensure_boolean/1, Utils.get_env([:lightning, Lightning.Runtime.RuntimeManager, :start]) - ) + ), + port: env!("RTM_PORT", :integer, 2222) config :lightning, :workers, private_key: @@ -399,6 +400,10 @@ defmodule Lightning.Config.Bootstrap do config :logger, :level, log_level end + if log_level == :debug do + config :libcluster, debug: true + end + database_url = env!("DATABASE_URL", :string, nil) config :lightning, Lightning.Repo, @@ -409,7 +414,6 @@ defmodule Lightning.Config.Bootstrap do queue_interval: env!("DATABASE_QUEUE_INTERVAL", :integer, 1000) host = env!("URL_HOST", :string, "example.com") - port = env!("PORT", :integer, 4000) url_port = env!("URL_PORT", :integer, 443) config :lightning, @@ -455,18 +459,6 @@ defmodule Lightning.Config.Bootstrap do You can generate one by calling: mix phx.gen.secret """ - listen_address = - env!( - "LISTEN_ADDRESS", - fn address -> - address - |> String.split(".") - |> Enum.map(&String.to_integer/1) - |> List.to_tuple() - end, - {127, 0, 0, 1} - ) - origins = env!( "ORIGINS", @@ -481,40 +473,10 @@ defmodule Lightning.Config.Bootstrap do url_scheme = env!("URL_SCHEME", :string, "https") - idle_timeout = - env!( - "IDLE_TIMEOUT", - fn str -> - case Integer.parse(str) do - :error -> 60_000 - {val, _} -> val * 1_000 - end - end, - 60_000 - ) - config :lightning, LightningWeb.Endpoint, url: [host: host, port: url_port, scheme: url_scheme], secret_key_base: secret_key_base, check_origin: origins, - http: [ - ip: listen_address, - port: port, - compress: true, - protocol_options: [ - # Note that if a request is more than 10x the max dataclip size, we cut - # the connection immediately to prevent memory issues via the - # :max_skip_body_length setting. - max_skip_body_length: - Application.get_env( - :lightning, - :max_dataclip_size_bytes, - 10_000_000 - ) * - 10, - idle_timeout: idle_timeout - ] - ], server: true end @@ -530,6 +492,8 @@ defmodule Lightning.Config.Bootstrap do assert_receive_timeout: env!("ASSERT_RECEIVE_TIMEOUT", :integer, 1000) end + config :lightning, LightningWeb.Endpoint, http: http_config(config_env()) + config :sentry, dsn: env!("SENTRY_DSN", :string, nil), filter: Lightning.SentryEventFilter, @@ -803,4 +767,69 @@ defmodule Lightning.Config.Bootstrap do value -> value end end + + defp http_config(env, opts \\ []) + # Production environment configuration + defp http_config(:prod, opts) do + port = Keyword.get(opts, :port) || env!("PORT", :integer, 4000) + + listen_address = + env!( + "LISTEN_ADDRESS", + fn address -> + address + |> String.split(".") + |> Enum.map(&String.to_integer/1) + |> List.to_tuple() + end, + {127, 0, 0, 1} + ) + + idle_timeout = + env!( + "IDLE_TIMEOUT", + fn str -> + case Integer.parse(str) do + :error -> 60_000 + {val, _} -> val * 1_000 + end + end, + 60_000 + ) + + [ + ip: listen_address, + port: port, + compress: true, + protocol_options: [ + # Note that if a request is more than 10x the max dataclip size, we cut + # the connection immediately to prevent memory issues via the + # :max_skip_body_length setting. + max_skip_body_length: + Application.get_env( + :lightning, + :max_dataclip_size_bytes, + 10_000_000 + ) * 10, + idle_timeout: idle_timeout + ] + ] + end + + # Default configuration for non-production environments + defp http_config(_env, opts) do + port = + Keyword.get(opts, :port) || + env!( + "PORT", + :integer, + get_env(:lightning, [LightningWeb.Endpoint, :http, :port]) + ) + + [ + ip: {0, 0, 0, 0}, + port: port, + compress: true + ] + end end From fcf74319a5cdfd9c0e8baf2b44f1d9acb0b9e2cb Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Wed, 2 Apr 2025 08:59:22 +0200 Subject: [PATCH 03/14] Add max_dataclip_size_bytes configuration - Introduced a new function `max_dataclip_size_bytes` in the `Lightning.Config` module to retrieve the maximum dataclip size from application environment. - Updated `LightningWeb.PlugConfigs` to use the new configuration function instead of directly fetching from the environment. - Modified tests in `WebhooksControllerTest` to stub the `max_dataclip_size_bytes` function for better isolation and control during testing. --- lib/lightning/config.ex | 10 ++++++++++ lib/lightning_web/plug_configs.ex | 2 +- .../controllers/webhooks_controller_test.exs | 7 ++++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/lib/lightning/config.ex b/lib/lightning/config.ex index 3e83a7bbdf..0c9903527b 100644 --- a/lib/lightning/config.ex +++ b/lib/lightning/config.ex @@ -294,6 +294,11 @@ defmodule Lightning.Config do def gdpr_preferences do Application.get_env(:lightning, :gdpr_preferences) end + + @impl true + def max_dataclip_size_bytes do + Application.get_env(:lightning, :max_dataclip_size_bytes, 10_000_000) + end end @callback apollo(key :: atom() | nil) :: map() @@ -340,6 +345,7 @@ defmodule Lightning.Config do @callback book_demo_openfn_workflow_url() :: String.t() @callback gdpr_banner() :: map() | false @callback gdpr_preferences() :: map() | false + @callback max_dataclip_size_bytes() :: integer() @doc """ Returns the configuration for the `Lightning.AdaptorRegistry` service @@ -538,6 +544,10 @@ defmodule Lightning.Config do impl().gdpr_preferences() end + def max_dataclip_size_bytes do + impl().max_dataclip_size_bytes() + end + defp impl do Application.get_env(:lightning, __MODULE__, API) end diff --git a/lib/lightning_web/plug_configs.ex b/lib/lightning_web/plug_configs.ex index 0a175a806b..1384fbef49 100644 --- a/lib/lightning_web/plug_configs.ex +++ b/lib/lightning_web/plug_configs.ex @@ -11,7 +11,7 @@ defmodule LightningWeb.PlugConfigs do :multipart, { :json, - length: Application.fetch_env!(:lightning, :max_dataclip_size_bytes) + length: Lightning.Config.max_dataclip_size_bytes() } ], pass: ["*/*"], diff --git a/test/lightning_web/controllers/webhooks_controller_test.exs b/test/lightning_web/controllers/webhooks_controller_test.exs index 1c1977e153..8bb4745a8e 100644 --- a/test/lightning_web/controllers/webhooks_controller_test.exs +++ b/test/lightning_web/controllers/webhooks_controller_test.exs @@ -2,6 +2,7 @@ defmodule LightningWeb.WebhooksControllerTest do use LightningWeb.ConnCase, async: false import Lightning.Factories + import Mox alias Lightning.Extensions.MockRateLimiter alias Lightning.Extensions.StubRateLimiter @@ -16,6 +17,8 @@ defmodule LightningWeb.WebhooksControllerTest do @fields Record.extract(:span, from: "deps/opentelemetry/include/otel_span.hrl") Record.defrecordp(:span, @fields) + setup :set_mox_from_context + describe "a POST request to '/i'" do setup [:stub_rate_limiter_ok, :stub_usage_limiter_ok] @@ -60,7 +63,9 @@ defmodule LightningWeb.WebhooksControllerTest do |> Repo.preload(:triggers) |> with_snapshot() - Application.put_env(:lightning, :max_dataclip_size_bytes, 1_000_000) + Mox.stub(Lightning.MockConfig, :max_dataclip_size_bytes, fn -> + 1_000_000 + end) smaller_body = %{"data" => %{a: String.duplicate("a", 500_000)}} From 90b1cb6f1aee05cf8cfd3a33a7cb2bcc9144994b Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Wed, 2 Apr 2025 09:22:36 +0200 Subject: [PATCH 04/14] WIP --- lib/lightning/rate_limiters.ex | 13 +++++++++++++ .../controllers/webhooks_controller.ex | 18 +++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex index 20adfb103e..2d60bffa82 100644 --- a/lib/lightning/rate_limiters.ex +++ b/lib/lightning/rate_limiters.ex @@ -22,6 +22,15 @@ defmodule Lightning.RateLimiters do | {:deny, non_neg_integer()} end + defmodule Webhook do + @moduledoc false + + use Hammer, + backend: Hammer.ETS, + algorithm: :leaky_bucket, + table: :webhook_limiter + end + @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() def hit({:failure_email, workflow_id, user_id}) do [time_scale: time_scale, rate_limit: rate_limit] = @@ -36,4 +45,8 @@ defmodule Lightning.RateLimiters do {:deny, count} end end + + def check_rate(project_id) do + Webhook.hit("#{project_id}", 1, 4) + end end diff --git a/lib/lightning_web/controllers/webhooks_controller.ex b/lib/lightning_web/controllers/webhooks_controller.ex index 8839ada8d5..9760f61071 100644 --- a/lib/lightning_web/controllers/webhooks_controller.ex +++ b/lib/lightning_web/controllers/webhooks_controller.ex @@ -10,6 +10,7 @@ defmodule LightningWeb.WebhooksController do alias Lightning.WorkOrders plug :reject_unfetched when action in [:create] + # plug :check_rate when action in [:create] # Reject requests with unfetched body params, as they are not supported # See Plug.Parsers in Endpoint for more information. @@ -27,7 +28,22 @@ defmodule LightningWeb.WebhooksController do end end - @spec create(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() + # defp check_rate(conn, _params) do + # # TODO: this may be _after_ the body has been parsed (into body_params), so we may need to + # # may need to move this plug further upstream. + # case Lightning.RateLimiters.check_rate(conn.project_id) do + # {:allow, _} -> + # conn + # {:deny, timeout} -> + # conn + # |> put_status(429) + # |> put_resp_header("retry-after", to_string(timeout)) + # |> json(%{"error" => "Too many requests"}) + # |> halt() + # end + # end + + @spec check(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() def check(conn, _params) do put_status(conn, :ok) |> json(%{ From 43c691f652dd2043ce685ea0a49d82660dbb5e29 Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Fri, 25 Apr 2025 10:37:26 +0200 Subject: [PATCH 05/14] WIP --- .iex.exs | 5 +- lib/lightning.ex | 9 + lib/lightning/application.ex | 2 +- lib/lightning/rate_limiters.ex | 48 ++- lib/lightning_web/channels/worker_channel.ex | 15 +- lib/replicated_rate_limiter.ex | 295 ++++++++++++++++++ mix.exs | 1 + mix.lock | 2 + test/lightning/rate_limiters_test.exs | 44 +++ .../controllers/webhooks_controller_test.exs | 3 + test/replicated_rate_limiter_test.exs | 100 ++++++ test/support/mock.ex | 4 + 12 files changed, 518 insertions(+), 10 deletions(-) create mode 100644 lib/replicated_rate_limiter.ex create mode 100644 test/lightning/rate_limiters_test.exs create mode 100644 test/replicated_rate_limiter_test.exs diff --git a/.iex.exs b/.iex.exs index 4289c885a9..bb72483cd4 100644 --- a/.iex.exs +++ b/.iex.exs @@ -1,2 +1,5 @@ -import Ecto.Query +if Code.loaded?(Ecto.Query) do + Kernel.SpecialForms.import(Ecto.Query) +end + alias Lightning.Repo diff --git a/lib/lightning.ex b/lib/lightning.ex index 287611d140..79a9e073f8 100644 --- a/lib/lightning.ex +++ b/lib/lightning.ex @@ -26,6 +26,11 @@ defmodule Lightning do Phoenix.PubSub.broadcast(@pubsub, topic, msg) end + @impl true + def broadcast_from(pid, topic, msg) do + Phoenix.PubSub.broadcast_from(@pubsub, pid, topic, msg) + end + @impl true def local_broadcast(topic, msg) do Phoenix.PubSub.local_broadcast(@pubsub, topic, msg) @@ -60,6 +65,7 @@ defmodule Lightning do # credo:disable-for-next-line @callback current_time() :: DateTime.t() @callback broadcast(binary(), {atom(), any()}) :: :ok | {:error, term()} + @callback broadcast_from(pid(), binary(), {atom(), any()}) :: :ok | {:error, term()} @callback local_broadcast(binary(), {atom(), any()}) :: :ok | {:error, term()} @callback subscribe(binary()) :: :ok | {:error, term()} @callback release() :: release_info() @@ -71,6 +77,9 @@ defmodule Lightning do def broadcast(topic, msg), do: impl().broadcast(topic, msg) + def broadcast_from(pid, topic, msg), + do: impl().broadcast_from(pid, topic, msg) + def local_broadcast(topic, msg), do: impl().local_broadcast(topic, msg) def subscribe(topic), do: impl().subscribe(topic) diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 09d88e4b6c..cfaf8d0c28 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -128,7 +128,7 @@ defmodule Lightning.Application do endpoint: LightningWeb.Endpoint}, {Lightning.KafkaTriggers.Supervisor, type: :supervisor}, # Start our rate limiter - {Lightning.RateLimiters.Mail, clean_period: :timer.minutes(10)} + {Lightning.RateLimiters, clean_period: :timer.minutes(10)}, # Start a worker by calling: Lightning.Worker.start_link(arg) # {Lightning.Worker, arg} ] diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex index 2d60bffa82..4dbd732905 100644 --- a/lib/lightning/rate_limiters.ex +++ b/lib/lightning/rate_limiters.ex @@ -31,6 +31,33 @@ defmodule Lightning.RateLimiters do table: :webhook_limiter end + defmodule Listener do + @moduledoc false + use GenServer + + @doc false + def start_link(opts) do + topic = Keyword.fetch!(opts, :topic) + GenServer.start_link(__MODULE__, {topic}) + end + + @impl true + def init({topic}) do + :ok = Lightning.subscribe(topic) + {:ok, []} + end + + # TODO: we need to broadcast _from_ the listener process. + # so that we don't end up getting hitting the same key on the same node. + + @impl true + def handle_info({:hit, key, scale, limit}, state) do + IO.inspect({:hit, key, scale, limit}) + _count = Webhook.hit(key, scale, limit) + {:noreply, state} + end + end + @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() def hit({:failure_email, workflow_id, user_id}) do [time_scale: time_scale, rate_limit: rate_limit] = @@ -46,7 +73,24 @@ defmodule Lightning.RateLimiters do end end - def check_rate(project_id) do - Webhook.hit("#{project_id}", 1, 4) + # 10 requests per second, then denied for 1 second + # Then allowing 2 request per second. + def hit({:webhook, project_id}) do + Registry.meta(Lightning.PubSub, :pubsub) |> IO.inspect() + Lightning.broadcast_from(self(), "__limiter", {:hit, project_id, 2, 10}) + Webhook.hit(project_id, 2, 10) + end + + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :supervisor + } + end + + def start_link(opts) do + children = [{Mail, opts}, {Webhook, opts}, {Listener, [topic: "__limiter"]}] + Supervisor.start_link(children, strategy: :one_for_one) end end diff --git a/lib/lightning_web/channels/worker_channel.ex b/lib/lightning_web/channels/worker_channel.ex index 4273315d38..71cd03cf5b 100644 --- a/lib/lightning_web/channels/worker_channel.ex +++ b/lib/lightning_web/channels/worker_channel.ex @@ -22,10 +22,17 @@ defmodule LightningWeb.WorkerChannel do @impl true def handle_in( "claim", - %{"demand" => demand, "worker_name" => worker_name}, + %{"demand" => demand} = payload, socket ) do - case Runs.claim(demand, sanitise_worker_name(worker_name)) do + worker_name = + payload["worker_name"] + |> case do + "" -> nil + worker_name -> worker_name + end + + case Runs.claim(demand, worker_name) do {:ok, runs} -> runs = runs @@ -47,10 +54,6 @@ defmodule LightningWeb.WorkerChannel do end end - defp sanitise_worker_name(""), do: nil - - defp sanitise_worker_name(worker_name), do: worker_name - defp run_options(run) do Ecto.assoc(run, :workflow) |> Lightning.Repo.one() diff --git a/lib/replicated_rate_limiter.ex b/lib/replicated_rate_limiter.ex new file mode 100644 index 0000000000..dbc3e90de9 --- /dev/null +++ b/lib/replicated_rate_limiter.ex @@ -0,0 +1,295 @@ +defmodule ReplicatedRateLimiter do + @moduledoc """ + __using__/1 will inject: + + - `ReplicatedRateLimiter.TokenBucket` - your ETS/CRDT sync server + - `start_link/1` & `child_spec/1` - to supervise both + - `allow?/4` - your public rate‑limit check + + Options: + + - `:crdt_name` - atom name for your AWLWWMap + - `:ets_table` - atom for your ETS cache + - `:default_capacity` - default bucket size + - `:default_refill` - default tokens/sec + """ + + defmacro __using__(opts) do + crdt_name = + Keyword.get(opts, :crdt_name) + + ets_table = + Keyword.get(opts, :ets_table) + + default_capacity = Keyword.get(opts, :default_capacity, 100) + default_refill = Keyword.get(opts, :default_refill, 10) + + quote do + use Supervisor + alias DeltaCrdt + require Logger + + default_name_prefix = + __MODULE__ + |> Module.split() + |> Enum.join() + |> String.replace(~r/(? String.downcase() + + @crdt_name unquote(crdt_name) || + (Macro.escape(default_name_prefix) <> "_crdt") + |> String.to_atom() + + @ets_table unquote(ets_table) || + (Macro.escape(default_name_prefix) <> "_ets") + |> String.to_atom() + + @cluster_name (Macro.escape(default_name_prefix) <> "_cluster") + |> String.to_atom() + + @default_capacity unquote(default_capacity) + @default_refill unquote(default_refill) + + @doc """ + Same as TokenBucket.allow?/4, but with a default capacity and refill rate. + """ + def allow?( + key, + capacity \\ @default_capacity, + refill_rate \\ @default_refill, + cost \\ 1 + ) do + ReplicatedRateLimiter.TokenBucket.allow?( + [crdt_name: @crdt_name, ets_table: @ets_table], + key, + capacity, + refill_rate, + cost + ) + end + + def inspect(bucket) do + ReplicatedRateLimiter.TokenBucket.inspect( + [crdt_name: @crdt_name, ets_table: @ets_table], + bucket + ) + end + + def config do + [crdt_name: @crdt_name, ets_table: @ets_table] + end + + @doc false + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :supervisor + } + end + + def start_link(opts \\ []) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_opts) do + children = [ + {ReplicatedRateLimiter.TokenBucket, + [crdt_name: @crdt_name, ets_table: @ets_table]}, + {CrdtCluster, [crdt: @crdt_name, name: @cluster_name]} + ] + + Supervisor.init(children, strategy: :one_for_one) |> IO.inspect() + end + + # Supervisor terminate callback to track when it's being stopped + def terminate(reason, _state) do + IO.inspect(reason) + Logger.warning("#{inspect(__MODULE__)} supervisor terminating with reason: #{inspect(reason)}") + :ok + end + end + end + + defmodule TokenBucket do + use GenServer + require Logger + alias DeltaCrdt + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + def allow?(config, bucket, capacity, refill, cost) do + ets_table = Keyword.get(config, :ets_table) + crdt_name = Keyword.get(config, :crdt_name) + + now = System.system_time(:second) + + :ets.insert_new(ets_table, {bucket, {capacity, now}}) + [{^bucket, {level, updated}}] = :ets.lookup(ets_table, bucket) + + refilled = trunc((now - updated) * refill) + current = min(capacity, level + refilled) + + if current >= cost do + next_level = current - cost + :ets.insert(ets_table, {bucket, {next_level, now}}) + + DeltaCrdt.put( + crdt_name, + {bucket, "#{Node.self()}"}, + {next_level, now} + ) + + {:allow, next_level} + else + wait_ms = ceil((cost - current) / refill * 1_000) |> round() + {:deny, wait_ms} + end + end + + def inspect(config, bucket) do + ets_table = Keyword.get(config, :ets_table) + + :ets.lookup(ets_table, bucket) + |> case do + [{^bucket, {level, updated}}] -> + {level, updated} + + [] -> + :not_found + + _ -> + :error + end + end + + @impl true + def init(opts) do + ets_table = Keyword.fetch!(opts, :ets_table) + crdt_name = Keyword.fetch!(opts, :crdt_name) + + :ets.new(ets_table, [ + :named_table, + :public, + read_concurrency: true + ]) + + DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, + name: crdt_name, + sync_interval: 100, + on_diffs: {__MODULE__, :apply_diffs, [opts]} + ) + |> case do + {:ok, pid} -> + {:ok, %{crdt: pid, crdt_name: crdt_name}} + + {:error, {:already_started, pid}} -> + {:stop, {:already_started, pid}} + end + end + + # merge incoming deltas into ETS + def apply_diffs(config, diffs) when is_list(diffs) do + Logger.debug("Applying diffs: #{inspect(diffs)}") + + ets_table = Keyword.get(config, :ets_table) + crdt_name = Keyword.get(config, :crdt_name) + + Task.start(fn -> + changed_buckets = + diffs + |> Enum.map(fn {:add, {key, _node}, _value} -> key end) + |> Enum.uniq() + + if changed_buckets != [] do + crdt_map = DeltaCrdt.to_map(crdt_name) + # %{ + # {"test", "a@127.0.0.1"} => {9, 1745414954}, + # {"test", "b@127.0.0.1"} => {9, 1745414927} + # } + + Enum.reduce(crdt_map, %{}, fn {{bucket, _node}, value}, acc -> + if bucket in changed_buckets do + Map.update(acc, bucket, [value], fn existing_values -> + [value | existing_values] + end) + else + acc + end + end) + |> Enum.each(fn {bucket, values} -> + :ets.insert( + ets_table, + {bucket, Enum.max_by(values, fn {_, v} -> v end)} + ) + end) + end + end) + + :ok + end + + @impl true + def terminate(reason, state) do + IO.inspect(reason) + Logger.warning("TokenBucket terminating with reason: #{inspect(reason)}, state: #{inspect(state)}") + :ok + end + end +end + +defmodule CrdtCluster do + # Standalone Cluster module that can be used by any CRDT + @moduledoc false + + use GenServer + require Logger + + def start_link(opts) do + crdt_name = Keyword.fetch!(opts, :crdt) + name = Keyword.get(opts, :name, __MODULE__) + + GenServer.start_link(__MODULE__, {crdt_name, name}, name: name) + end + + @impl true + def init(crdt_name) do + # watch node up/down + :net_kernel.monitor_nodes(true, node_type: :visible) + sync_neighbours(crdt_name) + {:ok, %{crdt_name: crdt_name}} + end + + @impl true + def handle_info({:nodeup, _n, _}, state) do + sync_neighbours(state.crdt_name) + {:noreply, state} + end + + def handle_info({:nodedown, _n, _}, state) do + sync_neighbours(state.crdt_name) + {:noreply, state} + end + + def handle_info(_, state), do: {:noreply, state} + + @impl true + def terminate(reason, state) do + Logger.warning("CrdtCluster terminating with reason: #{inspect(reason)}, state: #{inspect(state)}") + :ok + end + + defp sync_neighbours(crdt_name) do + peers = Node.list() + neighbours = Enum.map(peers, &{crdt_name, &1}) + + Logger.debug( + "CRDT neighbours for #{inspect(crdt_name)}: #{inspect(neighbours)}" + ) + + DeltaCrdt.set_neighbours(crdt_name, neighbours) + end +end diff --git a/mix.exs b/mix.exs index ded4035e5a..71554797a1 100644 --- a/mix.exs +++ b/mix.exs @@ -79,6 +79,7 @@ defmodule Lightning.MixProject do {:credo, "~> 1.7.3", only: [:test, :dev]}, {:crontab, "~> 1.1"}, {:dialyxir, "~> 1.4.2", only: [:test, :dev], runtime: false}, + {:delta_crdt, "~> 0.6.5"}, {:ecto_enum, "~> 1.4"}, {:ecto_psql_extras, "~> 0.8.2"}, {:ecto_sql, "~> 3.11"}, diff --git a/mix.lock b/mix.lock index 546f197d04..38466f0982 100644 --- a/mix.lock +++ b/mix.lock @@ -25,6 +25,7 @@ "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "delta_crdt": {:hex, :delta_crdt, "0.6.5", "c7bb8c2c7e60f59e46557ab4e0224f67ba22f04c02826e273738f3dcc4767adc", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c6ae23a525d30f96494186dd11bf19ed9ae21d9fe2c1f1b217d492a7cc7294ae"}, "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, "dotenvy": {:hex, :dotenvy, "0.8.0", "777486ad485668317c56afc53a7cbcd74f43e4e34588ba8e95a73e15a360050e", [:mix], [], "hexpm", "1f535066282388cbd109743d337ac46ff0708195780d4b5778bb83491ab1b654"}, "earmark": {:hex, :earmark, "1.4.47", "7e7596b84fe4ebeb8751e14cbaeaf4d7a0237708f2ce43630cfd9065551f94ca", [:mix], [], "hexpm", "3e96bebea2c2d95f3b346a7ff22285bc68a99fbabdad9b655aa9c6be06c698f8"}, @@ -77,6 +78,7 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "makeup_html": {:hex, :makeup_html, "0.1.1", "c3d4abd39d5f7e925faca72ada6e9cc5c6f5fa7cd5bc0158315832656cf14d7f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "44f2a61bc5243645dd7fafeaa6cc28793cd22f3c76b861e066168f9a5b2c26a4"}, "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, + "merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, "mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"}, diff --git a/test/lightning/rate_limiters_test.exs b/test/lightning/rate_limiters_test.exs new file mode 100644 index 0000000000..784c40a54c --- /dev/null +++ b/test/lightning/rate_limiters_test.exs @@ -0,0 +1,44 @@ +defmodule Lightning.RateLimitersTest do + use ExUnit.Case, async: true + + alias Lightning.RateLimiters + + describe "Mail" do + test "returns a hit result" do + id = Ecto.UUID.generate() + assert RateLimiters.Mail.hit(id, 1, 1) == {:allow, 1} + assert RateLimiters.Mail.hit(id, 1, 1) == {:deny, 1000} + end + end + + describe "Webhook" do + setup do + Mox.stub_with(LightningMock, Lightning.API) + + :ok + end + + test "returns a hit result" do + id = Ecto.UUID.generate() + assert RateLimiters.Webhook.hit(id, 1, 1) == {:allow, 1} + assert RateLimiters.Webhook.hit(id, 1, 1) == {:deny, 1000} + end + + test "returns a hit result for a project id" do + # 10 requests per second, then denied for 1 second + id = Ecto.UUID.generate() + + for i <- 1..10 do + assert RateLimiters.hit({:webhook, id}) == {:allow, i} + Process.sleep(5) + end + + assert RateLimiters.hit({:webhook, id}) == {:deny, 1000} + + Process.sleep(1005) + + # Leaked by 2, then add 1 for the next hit. + assert RateLimiters.hit({:webhook, id}) == {:allow, 9} + end + end +end diff --git a/test/lightning_web/controllers/webhooks_controller_test.exs b/test/lightning_web/controllers/webhooks_controller_test.exs index 0dd0244fdc..cf46cdec72 100644 --- a/test/lightning_web/controllers/webhooks_controller_test.exs +++ b/test/lightning_web/controllers/webhooks_controller_test.exs @@ -2,6 +2,7 @@ defmodule LightningWeb.WebhooksControllerTest do use LightningWeb.ConnCase, async: false import Lightning.Factories + import Mox alias Lightning.Extensions.MockRateLimiter alias Lightning.Extensions.StubRateLimiter @@ -12,6 +13,8 @@ defmodule LightningWeb.WebhooksControllerTest do alias Lightning.Runs alias Lightning.WorkOrders + setup :set_mox_from_context + describe "a POST request to '/i'" do setup [:stub_rate_limiter_ok, :stub_usage_limiter_ok] diff --git a/test/replicated_rate_limiter_test.exs b/test/replicated_rate_limiter_test.exs new file mode 100644 index 0000000000..e02d705fcd --- /dev/null +++ b/test/replicated_rate_limiter_test.exs @@ -0,0 +1,100 @@ +defmodule ReplicatedRateLimiterTest do + use ExUnit.Case, async: false + import Eventually + require Logger + + # Setup to log and ensure proper cleanup + # setup do + # Logger.info("Starting new test") + + # on_exit(fn -> + # Logger.info("Test completed, cleaning up") + + # # Let's explicitly clean up supervised processes + # Supervisor.which_children(ReplicatedRateLimiterTest.CrdtEts) + # |> Enum.each(fn {id, pid, _, _} -> + # Logger.info("Terminating child #{inspect(id)} with pid #{inspect(pid)}") + # Process.exit(pid, :shutdown) + # end) + + # Supervisor.which_children(ReplicatedRateLimiterTest.AnotherRateLimiter) + # |> Enum.each(fn {id, pid, _, _} -> + # Logger.info("Terminating child #{inspect(id)} with pid #{inspect(pid)}") + # Process.exit(pid, :shutdown) + # end) + + # :ok + # end) + + # :ok + # end + + defmodule CrdtEts do + use ReplicatedRateLimiter, + default_capacity: 1_000, + default_refill: 200 + end + + defmodule AnotherRateLimiter do + use ReplicatedRateLimiter + end + + test "CrdtEts allows calls directly from the main module" do + start_link_supervised!(CrdtEts) + + + config = CrdtEts.config() |> IO.inspect(label: "config") |> Map.new() + + # Test using the main module's allow? function + result = CrdtEts.allow?("test_key") + assert match?({:allow, 999}, result) + + before = System.system_time(:second) + + # Make several calls to hit the limit + Enum.each(1..10, fn _ -> CrdtEts.allow?("test_key_2", 5, 1) end) + + assert {0, last_updated} = + DeltaCrdt.get(config.crdt_name, {"test_key_2", "#{Node.self()}"}), + "CRDT should have 0 tokens" + + assert {0, ^last_updated} = + CrdtEts.inspect("test_key_2"), + "ETS should be the same as CRDT" + + assert before <= last_updated + + # This should be denied since we consumed all tokens + assert {:deny, 1000} = CrdtEts.allow?("test_key_2", 5, 1) + + # Another node enters the dungeon + {:ok, test_crdt} = + DeltaCrdt.start_link(DeltaCrdt.AWLWWMap) + + DeltaCrdt.set_neighbours(config.crdt_name, [test_crdt]) + DeltaCrdt.set_neighbours(test_crdt, [config.crdt_name]) + + # and updates the bucket + DeltaCrdt.put( + test_crdt, + {"test_key_2", "another_node"}, + {10, System.system_time(:second) + 1} + ) + + # Wait for the bucket to be updated + assert_eventually( + CrdtEts.inspect("test_key_2") + |> Tuple.to_list() + |> List.first() == 10 + ) + + assert {:allow, 4} = CrdtEts.allow?("test_key_2", 5, 1) + + end + + test "can start multiple rate limiters" do + start_supervised!(AnotherRateLimiter) + start_supervised!(CrdtEts) + + end +end diff --git a/test/support/mock.ex b/test/support/mock.ex index 269db5bbc5..39b240bd4b 100644 --- a/test/support/mock.ex +++ b/test/support/mock.ex @@ -8,6 +8,10 @@ defmodule Lightning.Stub do @impl true def broadcast(topic, msg), do: Lightning.API.broadcast(topic, msg) + @impl true + def broadcast_from(pid, topic, msg), + do: Lightning.API.broadcast_from(pid, topic, msg) + @impl true def local_broadcast(topic, msg), do: Lightning.API.local_broadcast(topic, msg) From 7cb616f506a258ce0d4a26b17cbe163d65557ad5 Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Fri, 25 Apr 2025 15:19:45 +0200 Subject: [PATCH 06/14] Refactor ReplicatedRateLimiter and Lightning.RateLimiters - Renamed `inspect` to `to_list` in `ReplicatedRateLimiter` for clarity. - Updated `init` function in `ReplicatedRateLimiter` to use a configuration function. - Removed the `Listener` module from `Lightning.RateLimiters` as it is no longer needed. - Adjusted tests to reflect the new `to_list` function and removed unnecessary logging setup. --- lib/lightning/rate_limiters.ex | 37 ++------------- lib/replicated_rate_limiter.ex | 66 ++++++--------------------- test/replicated_rate_limiter_test.exs | 35 ++------------ 3 files changed, 24 insertions(+), 114 deletions(-) diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex index 4dbd732905..40b1fde41b 100644 --- a/lib/lightning/rate_limiters.ex +++ b/lib/lightning/rate_limiters.ex @@ -25,39 +25,13 @@ defmodule Lightning.RateLimiters do defmodule Webhook do @moduledoc false + # Use ReplicatedRateLimiter instead use Hammer, backend: Hammer.ETS, algorithm: :leaky_bucket, table: :webhook_limiter end - defmodule Listener do - @moduledoc false - use GenServer - - @doc false - def start_link(opts) do - topic = Keyword.fetch!(opts, :topic) - GenServer.start_link(__MODULE__, {topic}) - end - - @impl true - def init({topic}) do - :ok = Lightning.subscribe(topic) - {:ok, []} - end - - # TODO: we need to broadcast _from_ the listener process. - # so that we don't end up getting hitting the same key on the same node. - - @impl true - def handle_info({:hit, key, scale, limit}, state) do - IO.inspect({:hit, key, scale, limit}) - _count = Webhook.hit(key, scale, limit) - {:noreply, state} - end - end - @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() def hit({:failure_email, workflow_id, user_id}) do [time_scale: time_scale, rate_limit: rate_limit] = @@ -73,11 +47,10 @@ defmodule Lightning.RateLimiters do end end - # 10 requests per second, then denied for 1 second - # Then allowing 2 request per second. def hit({:webhook, project_id}) do - Registry.meta(Lightning.PubSub, :pubsub) |> IO.inspect() - Lightning.broadcast_from(self(), "__limiter", {:hit, project_id, 2, 10}) + # 10 requests for a second, then 2 requests per second + # Over a long enough period of time, this will allow 2 requests per second. + # allow?("webhook_#{project_id}", 10, 2) Webhook.hit(project_id, 2, 10) end @@ -90,7 +63,7 @@ defmodule Lightning.RateLimiters do end def start_link(opts) do - children = [{Mail, opts}, {Webhook, opts}, {Listener, [topic: "__limiter"]}] + children = [{Mail, opts}, {Webhook, opts}] Supervisor.start_link(children, strategy: :one_for_one) end end diff --git a/lib/replicated_rate_limiter.ex b/lib/replicated_rate_limiter.ex index 81c6ba9aaf..8ad2a8a2ad 100644 --- a/lib/replicated_rate_limiter.ex +++ b/lib/replicated_rate_limiter.ex @@ -68,8 +68,8 @@ defmodule ReplicatedRateLimiter do ) end - def inspect(bucket) do - ReplicatedRateLimiter.TokenBucket.inspect( + def to_list(bucket) do + ReplicatedRateLimiter.TokenBucket.to_list( [crdt_name: @crdt_name, ets_table: @ets_table], bucket ) @@ -95,23 +95,19 @@ defmodule ReplicatedRateLimiter do @impl true def init(_opts) do children = [ - {ReplicatedRateLimiter.TokenBucket, - [crdt_name: @crdt_name, ets_table: @ets_table]}, + {ReplicatedRateLimiter.TokenBucket, config()}, + {DeltaCrdt, + [ + crdt: DeltaCrdt.AWLWWMap, + name: @crdt_name, + sync_interval: 100, + on_diffs: + {ReplicatedRateLimiter.TokenBucket, :apply_diffs, [config()]} + ]}, {CrdtCluster, [crdt: @crdt_name, name: @cluster_name]} ] - Supervisor.init(children, strategy: :one_for_one) |> IO.inspect() - end - - # Supervisor terminate callback to track when it's being stopped - def terminate(reason, _state) do - IO.inspect(reason) - - Logger.warning( - "#{inspect(__MODULE__)} supervisor terminating with reason: #{inspect(reason)}" - ) - - :ok + Supervisor.init(children, strategy: :one_for_one) end end end @@ -141,6 +137,7 @@ defmodule ReplicatedRateLimiter do next_level = current - cost :ets.insert(ets_table, {bucket, {next_level, now}}) + # TODO do this in a separate unlinked process DeltaCrdt.put( crdt_name, {bucket, "#{Node.self()}"}, @@ -154,7 +151,7 @@ defmodule ReplicatedRateLimiter do end end - def inspect(config, bucket) do + def to_list(config, bucket) do ets_table = Keyword.get(config, :ets_table) :ets.lookup(ets_table, bucket) @@ -173,7 +170,6 @@ defmodule ReplicatedRateLimiter do @impl true def init(opts) do ets_table = Keyword.fetch!(opts, :ets_table) - crdt_name = Keyword.fetch!(opts, :crdt_name) :ets.new(ets_table, [ :named_table, @@ -181,21 +177,9 @@ defmodule ReplicatedRateLimiter do read_concurrency: true ]) - DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, - name: crdt_name, - sync_interval: 100, - on_diffs: {__MODULE__, :apply_diffs, [opts]} - ) - |> case do - {:ok, pid} -> - {:ok, %{crdt: pid, crdt_name: crdt_name}} - - {:error, {:already_started, pid}} -> - {:stop, {:already_started, pid}} - end + {:ok, %{}} end - # merge incoming deltas into ETS def apply_diffs(config, diffs) when is_list(diffs) do Logger.debug("Applying diffs: #{inspect(diffs)}") @@ -235,17 +219,6 @@ defmodule ReplicatedRateLimiter do :ok end - - @impl true - def terminate(reason, state) do - IO.inspect(reason) - - Logger.warning( - "TokenBucket terminating with reason: #{inspect(reason)}, state: #{inspect(state)}" - ) - - :ok - end end end @@ -284,15 +257,6 @@ defmodule CrdtCluster do def handle_info(_, state), do: {:noreply, state} - @impl true - def terminate(reason, state) do - Logger.warning( - "CrdtCluster terminating with reason: #{inspect(reason)}, state: #{inspect(state)}" - ) - - :ok - end - defp sync_neighbours(crdt_name) do peers = Node.list() neighbours = Enum.map(peers, &{crdt_name, &1}) diff --git a/test/replicated_rate_limiter_test.exs b/test/replicated_rate_limiter_test.exs index 4ef0317ddd..a5aab36853 100644 --- a/test/replicated_rate_limiter_test.exs +++ b/test/replicated_rate_limiter_test.exs @@ -1,33 +1,6 @@ defmodule ReplicatedRateLimiterTest do use ExUnit.Case, async: false import Eventually - require Logger - - # Setup to log and ensure proper cleanup - # setup do - # Logger.info("Starting new test") - - # on_exit(fn -> - # Logger.info("Test completed, cleaning up") - - # # Let's explicitly clean up supervised processes - # Supervisor.which_children(ReplicatedRateLimiterTest.CrdtEts) - # |> Enum.each(fn {id, pid, _, _} -> - # Logger.info("Terminating child #{inspect(id)} with pid #{inspect(pid)}") - # Process.exit(pid, :shutdown) - # end) - - # Supervisor.which_children(ReplicatedRateLimiterTest.AnotherRateLimiter) - # |> Enum.each(fn {id, pid, _, _} -> - # Logger.info("Terminating child #{inspect(id)} with pid #{inspect(pid)}") - # Process.exit(pid, :shutdown) - # end) - - # :ok - # end) - - # :ok - # end defmodule CrdtEts do use ReplicatedRateLimiter, @@ -40,9 +13,9 @@ defmodule ReplicatedRateLimiterTest do end test "CrdtEts allows calls directly from the main module" do - start_link_supervised!(CrdtEts) + start_supervised!(CrdtEts) - config = CrdtEts.config() |> IO.inspect(label: "config") |> Map.new() + config = CrdtEts.config() |> Map.new() # Test using the main module's allow? function result = CrdtEts.allow?("test_key") @@ -58,7 +31,7 @@ defmodule ReplicatedRateLimiterTest do "CRDT should have 0 tokens" assert {0, ^last_updated} = - CrdtEts.inspect("test_key_2"), + CrdtEts.to_list("test_key_2"), "ETS should be the same as CRDT" assert before <= last_updated @@ -82,7 +55,7 @@ defmodule ReplicatedRateLimiterTest do # Wait for the bucket to be updated assert_eventually( - CrdtEts.inspect("test_key_2") + CrdtEts.to_list("test_key_2") |> Tuple.to_list() |> List.first() == 10 ) From 8c5bdc29c6c3319c802cea7cb7dae43e58a2315d Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Fri, 25 Apr 2025 15:28:43 +0200 Subject: [PATCH 07/14] Linting for bin/local_cluster --- bin/local_cluster | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/local_cluster b/bin/local_cluster index 42c091e776..cabb00d5d3 100755 --- a/bin/local_cluster +++ b/bin/local_cluster @@ -92,7 +92,7 @@ RESET="\033[0m" cleanup() { echo "Shutting down all processes..." for pid in "${PIDS[@]}"; do - kill $pid 2>/dev/null + kill "$pid" 2>/dev/null done exit 0 } @@ -127,7 +127,7 @@ if [ "$USE_PROXY" = true ]; then # Reverse proxy configuration localhost:4000 { reverse_proxy { - to $(for i in $(seq 1 $INSTANCES); do echo "localhost:$((BASE_PORT + i - 1))"; done | paste -sd " " -) + to $(for i in $(seq 1 "$INSTANCES"); do echo "localhost:$((BASE_PORT + i - 1))"; done | paste -sd " " -) lb_policy round_robin } } @@ -152,7 +152,7 @@ else fi # Start the requested number of instances -for i in $(seq 1 $INSTANCES); do +for i in $(seq 1 "$INSTANCES"); do export RTM_PORT=$((2222 + i - 1)) PORT=$((BASE_PORT + i - 1)) run_with_color "${COLORS[$i - 1]}" "node$i" elixir --name "node$i@127.0.0.1" -S mix phx.server & PIDS+=($!) From e885516180db98035920e3d23b4091de09ac917e Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Wed, 30 Apr 2025 11:54:12 +0200 Subject: [PATCH 08/14] Make sure mandatory is present --- lib/replicated_rate_limiter.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/replicated_rate_limiter.ex b/lib/replicated_rate_limiter.ex index 8ad2a8a2ad..96d0537f85 100644 --- a/lib/replicated_rate_limiter.ex +++ b/lib/replicated_rate_limiter.ex @@ -122,8 +122,8 @@ defmodule ReplicatedRateLimiter do end def allow?(config, bucket, capacity, refill, cost) do - ets_table = Keyword.get(config, :ets_table) - crdt_name = Keyword.get(config, :crdt_name) + ets_table = Keyword.fetch!(config, :ets_table) + crdt_name = Keyword.fetch!(config, :crdt_name) now = System.system_time(:second) @@ -183,8 +183,8 @@ defmodule ReplicatedRateLimiter do def apply_diffs(config, diffs) when is_list(diffs) do Logger.debug("Applying diffs: #{inspect(diffs)}") - ets_table = Keyword.get(config, :ets_table) - crdt_name = Keyword.get(config, :crdt_name) + ets_table = Keyword.fetch!(config, :ets_table) + crdt_name = Keyword.fetch!(config, :crdt_name) Task.start(fn -> changed_buckets = From a766c5ac0e3b83ee979384ba8e4f7b144a1517f5 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Wed, 30 Apr 2025 12:00:41 +0200 Subject: [PATCH 09/14] Test failing on eventual consistency 4 out of 10 (40%) left to reach consistency: DeltaCrdt.get(config.crdt_name, {"project2", "#{Node.self()}"}) #=> {4, 1746007259} --- lib/lightning/rate_limiters.ex | 12 +++---- test/replicated_rate_limiter_test.exs | 48 +++++++++++++++------------ 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex index 40b1fde41b..a064375521 100644 --- a/lib/lightning/rate_limiters.ex +++ b/lib/lightning/rate_limiters.ex @@ -25,11 +25,9 @@ defmodule Lightning.RateLimiters do defmodule Webhook do @moduledoc false - # Use ReplicatedRateLimiter instead - use Hammer, - backend: Hammer.ETS, - algorithm: :leaky_bucket, - table: :webhook_limiter + use ReplicatedRateLimiter, + default_capacity: 10, + default_refill: 2 end @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() @@ -51,7 +49,9 @@ defmodule Lightning.RateLimiters do # 10 requests for a second, then 2 requests per second # Over a long enough period of time, this will allow 2 requests per second. # allow?("webhook_#{project_id}", 10, 2) - Webhook.hit(project_id, 2, 10) + # capacity and refill is by design a module attribute + # TODO: passing it here might eliminate the need for macro for easier maintainance + Webhook.allow?("webhook_#{project_id}") end def child_spec(opts) do diff --git a/test/replicated_rate_limiter_test.exs b/test/replicated_rate_limiter_test.exs index a5aab36853..f7e6e63661 100644 --- a/test/replicated_rate_limiter_test.exs +++ b/test/replicated_rate_limiter_test.exs @@ -4,8 +4,8 @@ defmodule ReplicatedRateLimiterTest do defmodule CrdtEts do use ReplicatedRateLimiter, - default_capacity: 1_000, - default_refill: 200 + default_capacity: 10, + default_refill: 2 end defmodule AnotherRateLimiter do @@ -17,50 +17,56 @@ defmodule ReplicatedRateLimiterTest do config = CrdtEts.config() |> Map.new() - # Test using the main module's allow? function - result = CrdtEts.allow?("test_key") - assert match?({:allow, 999}, result) + # Check one time (default cost is 1) + assert match?({:allow, 9}, CrdtEts.allow?("project1")) before = System.system_time(:second) - # Make several calls to hit the limit - Enum.each(1..10, fn _ -> CrdtEts.allow?("test_key_2", 5, 1) end) + # Make several calls to hit the limit (default capacity is 10) + Enum.each(1..9, fn _ -> CrdtEts.allow?("project2") end) - assert {0, last_updated} = - DeltaCrdt.get(config.crdt_name, {"test_key_2", "#{Node.self()}"}), - "CRDT should have 0 tokens" + assert_eventually( + DeltaCrdt.get(config.crdt_name, {"project2", "#{Node.self()}"}) |> dbg |> elem(0) == 0, + 1_000 + ) - assert {0, ^last_updated} = - CrdtEts.to_list("test_key_2"), + assert {0, last_updated} = + CrdtEts.to_list("project2"), "ETS should be the same as CRDT" assert before <= last_updated # This should be denied since we consumed all tokens - assert {:deny, 1000} = CrdtEts.allow?("test_key_2", 5, 1) + assert {:deny, 1000} = CrdtEts.allow?("project2", 10, 2) - # Another node enters the dungeon + # Node2 enters the dungeon {:ok, test_crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap) DeltaCrdt.set_neighbours(config.crdt_name, [test_crdt]) DeltaCrdt.set_neighbours(test_crdt, [config.crdt_name]) - # and updates the bucket + # a time has passed and the bucket is refilled by Node2 DeltaCrdt.put( test_crdt, - {"test_key_2", "another_node"}, - {10, System.system_time(:second) + 1} + {"project2", "another_node"}, + {10, System.system_time(:second)} ) + # Node2 consumes all the credits except one + Enum.each(1..9, fn i -> + DeltaCrdt.put( + test_crdt, + {"project2", "another_node"}, + {10-i, System.system_time(:second)}) + end) + # Wait for the bucket to be updated assert_eventually( - CrdtEts.to_list("test_key_2") - |> Tuple.to_list() - |> List.first() == 10 + CrdtEts.to_list("project2") |> elem(0) == 1 ) - assert {:allow, 4} = CrdtEts.allow?("test_key_2", 5, 1) + assert {:allow, 4} = CrdtEts.allow?("project2", 10, 2) end test "can start multiple rate limiters" do From 5ec9781d7d39148d589a53ce22f0b4896132a0c7 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Wed, 30 Apr 2025 15:25:46 +0200 Subject: [PATCH 10/14] Using CRDTs for the processes only with Horde --- lib/lightning/application.ex | 15 +++ lib/lightning/extensions/rate_limiting.ex | 11 +- lib/lightning/rate_limiters.ex | 19 +--- lib/lightning/services/rate_limiter.ex | 2 +- lib/lightning/webhook_rate_limiter.ex | 100 +++++++++++++++++ .../controllers/webhooks_controller.ex | 104 +++++++++--------- mix.exs | 3 +- mix.lock | 2 + test/lightning/webhook_rate_limiter_test.exs | 100 +++++++++++++++++ test/replicated_rate_limiter_test.exs | 11 +- test/test_helper.exs | 7 ++ 11 files changed, 287 insertions(+), 87 deletions(-) create mode 100644 lib/lightning/webhook_rate_limiter.ex create mode 100644 test/lightning/webhook_rate_limiter_test.exs diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 40d158e1e5..23f9a0bbf3 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -105,6 +105,12 @@ defmodule Lightning.Application do [ Lightning.PromEx, {Cluster.Supervisor, [topologies, [name: Lightning.ClusterSupervisor]]}, + {Horde.Registry, + name: Lightning.HordeRegistry, keys: :unique, members: :auto}, + {Horde.DynamicSupervisor, + name: Lightning.DistributedSupervisor, + strategy: :one_for_one, + members: :auto}, {Lightning.Vault, Application.get_env(:lightning, Lightning.Vault, [])}, # Start the Ecto repository Lightning.Repo, @@ -174,6 +180,15 @@ defmodule Lightning.Application do :ok end + def start_phase(:init_rate_limiter, :normal, _args) do + Horde.DynamicSupervisor.start_child( + Lightning.DistributedSupervisor, + Lightning.WebhookRateLimiter + ) + + :ok + end + def oban_opts do opts = Application.get_env(:lightning, Oban) diff --git a/lib/lightning/extensions/rate_limiting.ex b/lib/lightning/extensions/rate_limiting.ex index 9371b79198..aae3a0232d 100644 --- a/lib/lightning/extensions/rate_limiting.ex +++ b/lib/lightning/extensions/rate_limiting.ex @@ -8,16 +8,9 @@ defmodule Lightning.Extensions.RateLimiting do @type message :: Lightning.Extensions.Message.t() defmodule Context do - @moduledoc """ - Which user is making the request for a certain project. - """ + @type t :: %Context{project_id: Ecto.UUID.t()} - @type t :: %Context{ - project_id: Ecto.UUID.t(), - user_id: Ecto.UUID.t() | nil - } - - defstruct [:project_id, :user_id] + defstruct [:project_id] end @callback limit_request( diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex index a064375521..4db0763cbf 100644 --- a/lib/lightning/rate_limiters.ex +++ b/lib/lightning/rate_limiters.ex @@ -22,14 +22,6 @@ defmodule Lightning.RateLimiters do | {:deny, non_neg_integer()} end - defmodule Webhook do - @moduledoc false - - use ReplicatedRateLimiter, - default_capacity: 10, - default_refill: 2 - end - @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() def hit({:failure_email, workflow_id, user_id}) do [time_scale: time_scale, rate_limit: rate_limit] = @@ -45,15 +37,6 @@ defmodule Lightning.RateLimiters do end end - def hit({:webhook, project_id}) do - # 10 requests for a second, then 2 requests per second - # Over a long enough period of time, this will allow 2 requests per second. - # allow?("webhook_#{project_id}", 10, 2) - # capacity and refill is by design a module attribute - # TODO: passing it here might eliminate the need for macro for easier maintainance - Webhook.allow?("webhook_#{project_id}") - end - def child_spec(opts) do %{ id: __MODULE__, @@ -63,7 +46,7 @@ defmodule Lightning.RateLimiters do end def start_link(opts) do - children = [{Mail, opts}, {Webhook, opts}] + children = [{Mail, opts}] Supervisor.start_link(children, strategy: :one_for_one) end end diff --git a/lib/lightning/services/rate_limiter.ex b/lib/lightning/services/rate_limiter.ex index f069314aee..c11f952e5e 100644 --- a/lib/lightning/services/rate_limiter.ex +++ b/lib/lightning/services/rate_limiter.ex @@ -7,7 +7,7 @@ defmodule Lightning.Services.RateLimiter do import Lightning.Services.AdapterHelper @impl true - def limit_request(conn, context, opts) do + def limit_request(conn, context, opts \\ []) do adapter().limit_request(conn, context, opts) end diff --git a/lib/lightning/webhook_rate_limiter.ex b/lib/lightning/webhook_rate_limiter.ex new file mode 100644 index 0000000000..294ee3942d --- /dev/null +++ b/lib/lightning/webhook_rate_limiter.ex @@ -0,0 +1,100 @@ +defmodule Lightning.WebhookRateLimiter do + @moduledoc false + use GenServer + + @capacity 10 + @refill_per_sec 2 + + require Logger + + def child_spec(opts) do + {id, name} = + if name = Keyword.get(opts, :name) do + {"#{__MODULE__}_#{name}", name} + else + {__MODULE__, __MODULE__} + end + + %{ + id: id, + start: {__MODULE__, :start_link, [name]}, + shutdown: 10_000, + restart: :transient + } + end + + def start_link(name) do + with {:error, {:already_started, pid}} <- + GenServer.start_link(__MODULE__, [], name: via_tuple(name)) do + Logger.info("already started at #{inspect(pid)}, returning :ignore") + :ignore + end + end + + @impl true + def init([]) do + Process.flag(:trap_exit, true) + + {:ok, %{table: :ets.new(:table, [:set])}} + end + + def check_rate(bucket, cost \\ 1, name \\ __MODULE__) do + name + |> via_tuple() + |> GenServer.call({:check_rate, bucket, cost}) + end + + def inspect_table(name \\ __MODULE__) do + name + |> via_tuple() + |> GenServer.call(:inspect_table) + end + + @impl true + def handle_call({:check_rate, bucket, cost}, _from, %{table: table} = state) do + {:reply, do_check_rate(table, bucket, cost), state} + end + + @impl true + def handle_call(:inspect_table, _from, %{table: table} = state) do + {:reply, :ets.info(table), state} + end + + @impl true + def handle_info( + {:EXIT, _from, {:name_conflict, {_key, _value}, registry, pid}}, + state + ) do + Logger.info( + "Stopping #{inspect({registry, pid})} as it has already started in another node." + ) + + {:stop, :normal, state} + end + + def do_check_rate(table, bucket, cost) do + now = System.monotonic_time(:millisecond) + + :ets.insert_new(table, {bucket, {@capacity, now}}) + [{^bucket, {level, updated}}] = :ets.lookup(table, bucket) + + refilled = div(now - updated, 1_000) * @refill_per_sec + current = min(@capacity, level + refilled) + + if current >= cost do + level = current - cost + :ets.insert(table, {bucket, {level, now}}) + + {:allow, level} + else + # can retry after 1 second + {:deny, 1} + end + end + + def capacity, do: @capacity + def refill_per_second, do: @refill_per_sec + + def via_tuple(name), + do: {:via, Horde.Registry, {Lightning.HordeRegistry, name}} +end diff --git a/lib/lightning_web/controllers/webhooks_controller.ex b/lib/lightning_web/controllers/webhooks_controller.ex index 9760f61071..c5967610d2 100644 --- a/lib/lightning_web/controllers/webhooks_controller.ex +++ b/lib/lightning_web/controllers/webhooks_controller.ex @@ -1,7 +1,6 @@ defmodule LightningWeb.WebhooksController do use LightningWeb, :controller - alias Lightning.Extensions.RateLimiting alias Lightning.Extensions.UsageLimiting.Action alias Lightning.Extensions.UsageLimiting.Context alias Lightning.Services.RateLimiter @@ -10,7 +9,7 @@ defmodule LightningWeb.WebhooksController do alias Lightning.WorkOrders plug :reject_unfetched when action in [:create] - # plug :check_rate when action in [:create] + plug :check_rate when action in [:create] # Reject requests with unfetched body params, as they are not supported # See Plug.Parsers in Endpoint for more information. @@ -28,20 +27,36 @@ defmodule LightningWeb.WebhooksController do end end - # defp check_rate(conn, _params) do - # # TODO: this may be _after_ the body has been parsed (into body_params), so we may need to - # # may need to move this plug further upstream. - # case Lightning.RateLimiters.check_rate(conn.project_id) do - # {:allow, _} -> - # conn - # {:deny, timeout} -> - # conn - # |> put_status(429) - # |> put_resp_header("retry-after", to_string(timeout)) - # |> json(%{"error" => "Too many requests"}) - # |> halt() - # end - # end + # Note: Plug.Parsers is called before the Router. + def check_rate(conn, _opts) do + with %Workflows.Trigger{enabled: true, workflow: %{project_id: project_id}} <- + conn.assigns.trigger, + :ok <- RateLimiter.limit_request(conn, %Context{project_id: project_id}) do + conn + else + %Workflows.Trigger{enabled: false} -> + conn + |> put_status(:forbidden) + |> json(%{ + message: + "Unable to process request, trigger is disabled. Enable it on OpenFn to allow requests to this endpoint." + }) + |> halt() + + {:error, :too_many_requests, %{text: message}} -> + conn + |> put_status(:too_many_requests) + |> put_resp_header("retry-after", "1") + |> json(%{"error" => message}) + |> halt() + + _no_trigger_or_workflow -> + conn + |> put_status(:not_found) + |> json(%{"error" => "Webhook not found"}) + |> halt() + end + end @spec check(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() def check(conn, _params) do @@ -53,30 +68,25 @@ defmodule LightningWeb.WebhooksController do end @spec create(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() - def create(conn, _params) do - with %Workflows.Trigger{enabled: true, workflow: %{project_id: project_id}} = - trigger <- conn.assigns.trigger, - {:ok, without_run?} <- check_skip_run_creation(project_id), - :ok <- - RateLimiter.limit_request( - conn, - %RateLimiting.Context{project_id: project_id}, - [] - ) do - {:ok, work_order} = - WorkOrders.create_for(trigger, - workflow: trigger.workflow, - dataclip: %{ - body: conn.body_params, - request: build_request(conn), - type: :http_request, - project_id: project_id - }, - without_run: without_run? - ) - - conn |> json(%{work_order_id: work_order.id}) - else + def create(%{assigns: %{trigger: trigger}} = conn, _params) do + %Workflows.Trigger{workflow: workflow} = trigger + + case check_skip_run_creation(workflow.project_id) do + {:ok, without_run?} -> + {:ok, work_order} = + WorkOrders.create_for(trigger, + workflow: workflow, + dataclip: %{ + body: conn.body_params, + request: build_request(conn), + type: :http_request, + project_id: workflow.project_id + }, + without_run: without_run? + ) + + json(conn, %{work_order_id: work_order.id}) + {:error, reason, %{text: message}} -> status = if reason == :too_many_requests, @@ -85,19 +95,7 @@ defmodule LightningWeb.WebhooksController do conn |> put_status(status) - |> json(%{"error" => message}) - - nil -> - conn - |> put_status(:not_found) - |> json(%{"error" => "Webhook not found"}) - - _disabled -> - put_status(conn, :forbidden) - |> json(%{ - message: - "Unable to process request, trigger is disabled. Enable it on OpenFn to allow requests to this endpoint." - }) + |> json(%{error: message}) end end diff --git a/mix.exs b/mix.exs index 74498ed887..573664231e 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule Lightning.MixProject do [ mod: {Lightning.Application, [:timex]}, extra_applications: [:logger, :runtime_tools, :os_mon, :scrivener], - start_phases: [seed_prom_ex_telemetry: []] + start_phases: [seed_prom_ex_telemetry: [], init_rate_limiter: []] ] end @@ -69,6 +69,7 @@ defmodule Lightning.MixProject do defp deps do [ # {:rexbug, ">= 1.0.0", only: :test}, + {:horde, "~> 0.9.0"}, {:bcrypt_elixir, "~> 3.2"}, {:bodyguard, "~> 2.2"}, {:broadway_kafka, "~> 0.4.2"}, diff --git a/mix.lock b/mix.lock index 7b57fdcbe8..01de534be4 100644 --- a/mix.lock +++ b/mix.lock @@ -61,6 +61,7 @@ "hammer": {:hex, :hammer, "7.0.1", "136edcd81af44becbe6b73a958c109e2364ab0dc026d7b19892037dc2632078c", [:mix], [], "hexpm", "796edf14ab2aa80df72080210fcf944ee5e8868d8ece7a7511264d802f58cc2d"}, "hammer_backend_mnesia": {:hex, :hammer_backend_mnesia, "0.7.0", "b2a8cccc1d3506bc4cf8e95750fa5bd491390f227e9a0d981ad375291d7bd1dc", [:mix], [{:hammer, "~> 7.0", [hex: :hammer, repo: "hexpm", optional: false]}], "hexpm", "f77a3d54df865aa8137926df6fadac2a81c06f1c1c22f4e98e32392a22bf9e3e"}, "heroicons": {:hex, :heroicons, "0.5.6", "95d730e7179c633df32d95c1fdaaecdf81b0da11010b89b737b843ac176a7eb5", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.2", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "ca267f02a5fa695a4178a737b649fb6644a2e399639d4ba7964c18e8a58c2352"}, + "horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"}, "hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"}, "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, @@ -72,6 +73,7 @@ "junit_formatter": {:hex, :junit_formatter, "3.4.0", "d0e8db6c34dab6d3c4154c3b46b21540db1109ae709d6cf99ba7e7a2ce4b1ac2", [:mix], [], "hexpm", "bb36e2ae83f1ced6ab931c4ce51dd3dbef1ef61bb4932412e173b0cfa259dacd"}, "kafka_protocol": {:hex, :kafka_protocol, "4.1.9", "7c10d9adaba84c6f176f152e6ba8029c46dfb7cb12432587009128836cf9a44a", [:rebar3], [{:crc32cer, "0.1.11", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "14f89eed8329ff4c7b5448e318ee20a98bf5c1e5dc41b74b8af459dfb7590cef"}, "libcluster": {:hex, :libcluster, "3.4.1", "271d2da892763bbef53c2872036c936fe8b80111eb1feefb2d30a3bb15c9b4f6", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1d568157f069c6afa70ec0d736704cf799734bdbb6343f0322af4a980301c853"}, + "libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_eex": {:hex, :makeup_eex, "0.1.2", "93a5ef3d28ed753215dba2d59cb40408b37cccb4a8205e53ef9b5319a992b700", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.16 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_html, "~> 0.1.0 or ~> 1.0", [hex: :makeup_html, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "6140eafb28215ad7182282fd21d9aa6dcffbfbe0eb876283bc6b768a6c57b0c3"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, diff --git a/test/lightning/webhook_rate_limiter_test.exs b/test/lightning/webhook_rate_limiter_test.exs new file mode 100644 index 0000000000..bc38c82054 --- /dev/null +++ b/test/lightning/webhook_rate_limiter_test.exs @@ -0,0 +1,100 @@ +defmodule Lightning.WebhookRateLimiterTest do + @moduledoc false + use ExUnit.Case + + alias Lightning.WebhookRateLimiter + + @default_capacity 10 + + describe "check_rate/2" do + test "allows up to the capacity and refills on multiple buckets" do + initial_capacity = @default_capacity + bucket1 = "project#{System.unique_integer()}" + bucket2 = "project#{System.unique_integer()}" + + Enum.each(1..initial_capacity, fn i -> + level = initial_capacity - i + assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket1)) + assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket2)) + end) + end + + test "denies after consuming the bucket" do + initial_capacity = @default_capacity + bucket1 = "project#{System.unique_integer()}" + bucket2 = "project#{System.unique_integer()}" + + Enum.each(1..initial_capacity, fn i -> + assert {:allow, level} = WebhookRateLimiter.check_rate(bucket1) + assert level == initial_capacity - i + end) + + assert {:allow, level} = WebhookRateLimiter.check_rate(bucket2) + assert level == initial_capacity - 1 + + assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket1) + assert 0 < wait_ms and wait_ms < 1_000 + end + + # Synthetic cluster not working. + # For testing use manual procedure: + # 0. Disable Endpoint server + # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server + # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server + # 3. Call Lightning.WebhookRateLimiter.inspect_table() on both iex and they show the same ets table process and node. + @tag skip: true + test "consumes the bucket remotely" do + {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") + + :rpc.call(node2, Application, :ensure_all_started, [:mix]) + :rpc.call(node2, Application, :ensure_all_started, [:lightning]) + + # Copy current code paths to the peer node + :rpc.call(node2, :code, :add_paths, [:code.get_path()]) + + assert [ + {Lightning.DistributedSupervisor, :node1@localhost}, + {Lightning.DistributedSupervisor, :node2@localhost} + ] = Horde.Cluster.members(Lightning.DistributedSupervisor) + + # initial_capacity = @default_capacity + bucket = "project#{System.unique_integer()}" + + dbg(WebhookRateLimiter.check_rate(bucket)) + + # dbg :rpc.block_call(node1, WebhookRateLimiter, :inspect, [WebhookRateLimiter]) + # dbg :rpc.block_call(node2, WebhookRateLimiter, :inspect, [WebhookRateLimiter]) + + # Enum.each(1..initial_capacity-1, fn i -> + # assert {:allow, level} = :rpc.call(node2, WebhookRateLimiter, :check_rate, [bucket, 1]) + # assert level == initial_capacity - i - 1 + # end) + + # assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket) + # assert 0 < wait_ms and wait_ms < 1_000 + + :peer.stop(peer) + end + end + + defp start_nodes(node1, node2, host) do + # Start the main node + node1_sname = :"#{node1}@#{host}" + {:ok, _pid} = Node.start(node1_sname, :shortnames) + true = Node.set_cookie(:delicious_cookie) + cookie = Node.get_cookie() |> to_charlist() + + # Start the peer node + {:ok, peer, node2_sname} = + :peer.start(%{ + name: node2, + host: host, + cookie: cookie, + args: [~c"-setcookie", cookie] + }) + + assert node2_sname in Node.list() + + {:ok, peer, node1_sname, node2_sname} + end +end diff --git a/test/replicated_rate_limiter_test.exs b/test/replicated_rate_limiter_test.exs index f7e6e63661..7cc30bfb86 100644 --- a/test/replicated_rate_limiter_test.exs +++ b/test/replicated_rate_limiter_test.exs @@ -26,7 +26,9 @@ defmodule ReplicatedRateLimiterTest do Enum.each(1..9, fn _ -> CrdtEts.allow?("project2") end) assert_eventually( - DeltaCrdt.get(config.crdt_name, {"project2", "#{Node.self()}"}) |> dbg |> elem(0) == 0, + DeltaCrdt.get(config.crdt_name, {"project2", "#{Node.self()}"}) + |> dbg + |> elem(0) == 0, 1_000 ) @@ -58,13 +60,12 @@ defmodule ReplicatedRateLimiterTest do DeltaCrdt.put( test_crdt, {"project2", "another_node"}, - {10-i, System.system_time(:second)}) + {10 - i, System.system_time(:second)} + ) end) # Wait for the bucket to be updated - assert_eventually( - CrdtEts.to_list("project2") |> elem(0) == 1 - ) + assert_eventually(CrdtEts.to_list("project2") |> elem(0) == 1) assert {:allow, 4} = CrdtEts.allow?("project2", 10, 2) end diff --git a/test/test_helper.exs b/test/test_helper.exs index cc0209588f..9a710135a2 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -54,5 +54,12 @@ Application.put_env(:lightning, Lightning.Extensions, external_metrics: Lightning.Extensions.ExternalMetrics ) +epmd_path = System.find_executable("epmd") +port = Port.open({:spawn_executable, epmd_path}, []) +os_pid = Keyword.get(Port.info(port), :os_pid) + +# Configuring a "shutdown hook" to stop epmd after everything is done. +System.at_exit(fn _ -> System.shell("kill -TERM #{os_pid}") end) + ExUnit.start() Ecto.Adapters.SQL.Sandbox.mode(Lightning.Repo, :manual) From ba91ae5c0a08fb11c4e292fe5db01064ae799c63 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Thu, 1 May 2025 22:29:09 +0200 Subject: [PATCH 11/14] Move capacity and refill to config --- .iex.exs | 2 +- config/config.exs | 5 ++++ config/test.exs | 2 ++ lib/lightning/application.ex | 15 ++++++++--- lib/lightning/webhook_rate_limiter.ex | 37 +++++++++++++++------------ 5 files changed, 39 insertions(+), 22 deletions(-) diff --git a/.iex.exs b/.iex.exs index bb72483cd4..a26dcdf6f4 100644 --- a/.iex.exs +++ b/.iex.exs @@ -1,5 +1,5 @@ if Code.loaded?(Ecto.Query) do - Kernel.SpecialForms.import(Ecto.Query) + import Ecto.Query end alias Lightning.Repo diff --git a/config/config.exs b/config/config.exs index e7899043ca..ef23f0f319 100644 --- a/config/config.exs +++ b/config/config.exs @@ -25,6 +25,11 @@ config :lightning, LightningWeb.Endpoint, pubsub_server: Lightning.PubSub, live_view: [signing_salt: "EfrmuOUr"] +config :lightning, Lightning.WebhookRateLimiter, + start: false, + capacity: 10, + refill_per_second: 2 + config :lightning, Lightning.Extensions, rate_limiter: Lightning.Extensions.RateLimiter, usage_limiter: Lightning.Extensions.UsageLimiter, diff --git a/config/test.exs b/config/test.exs index b19890a461..9347844ed1 100644 --- a/config/test.exs +++ b/config/test.exs @@ -53,6 +53,8 @@ config :lightning, LightningWeb.Endpoint, "/8zedVJLxvmGGFoRExE3e870g7CGZZQ1Vq11A5MbQGPKOpK57MahVsPW6Wkkv61n", server: true +config :lightning, Lightning.WebhookRateLimiter, start: true + config :lightning, Lightning.Runtime.RuntimeManager, ws_url: "ws://localhost:4002/worker" diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 23f9a0bbf3..b9e6407101 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -7,6 +7,11 @@ defmodule Lightning.Application do require Logger + @rate_limiter_opts Application.compile_env!( + :lightning, + Lightning.WebhookRateLimiter + ) + @impl true def start(_type, _args) do # Initialize ETS table for adapter lookup @@ -181,10 +186,12 @@ defmodule Lightning.Application do end def start_phase(:init_rate_limiter, :normal, _args) do - Horde.DynamicSupervisor.start_child( - Lightning.DistributedSupervisor, - Lightning.WebhookRateLimiter - ) + if @rate_limiter_opts[:start] do + Horde.DynamicSupervisor.start_child( + Lightning.DistributedSupervisor, + {Lightning.WebhookRateLimiter, @rate_limiter_opts} + ) + end :ok end diff --git a/lib/lightning/webhook_rate_limiter.ex b/lib/lightning/webhook_rate_limiter.ex index 294ee3942d..9bcff97bc7 100644 --- a/lib/lightning/webhook_rate_limiter.ex +++ b/lib/lightning/webhook_rate_limiter.ex @@ -2,9 +2,6 @@ defmodule Lightning.WebhookRateLimiter do @moduledoc false use GenServer - @capacity 10 - @refill_per_sec 2 - require Logger def child_spec(opts) do @@ -17,25 +14,30 @@ defmodule Lightning.WebhookRateLimiter do %{ id: id, - start: {__MODULE__, :start_link, [name]}, + start: {__MODULE__, :start_link, [Keyword.put(opts, :name, name)]}, shutdown: 10_000, restart: :transient } end - def start_link(name) do + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + with {:error, {:already_started, pid}} <- - GenServer.start_link(__MODULE__, [], name: via_tuple(name)) do + GenServer.start_link(__MODULE__, opts, name: via_tuple(name)) do Logger.info("already started at #{inspect(pid)}, returning :ignore") :ignore end end @impl true - def init([]) do + def init(opts) do Process.flag(:trap_exit, true) - {:ok, %{table: :ets.new(:table, [:set])}} + capacity = Keyword.fetch!(opts, :capacity) + refill = Keyword.fetch!(opts, :refill_per_second) + + {:ok, %{table: :ets.new(:table, [:set]), capacity: capacity, refill: refill}} end def check_rate(bucket, cost \\ 1, name \\ __MODULE__) do @@ -51,8 +53,8 @@ defmodule Lightning.WebhookRateLimiter do end @impl true - def handle_call({:check_rate, bucket, cost}, _from, %{table: table} = state) do - {:reply, do_check_rate(table, bucket, cost), state} + def handle_call({:check_rate, bucket, cost}, _from, state) do + {:reply, do_check_rate(state, bucket, cost), state} end @impl true @@ -72,14 +74,18 @@ defmodule Lightning.WebhookRateLimiter do {:stop, :normal, state} end - def do_check_rate(table, bucket, cost) do + def do_check_rate( + %{table: table, capacity: capacity, refill: refill_per_sec}, + bucket, + cost + ) do now = System.monotonic_time(:millisecond) - :ets.insert_new(table, {bucket, {@capacity, now}}) + :ets.insert_new(table, {bucket, {capacity, now}}) [{^bucket, {level, updated}}] = :ets.lookup(table, bucket) - refilled = div(now - updated, 1_000) * @refill_per_sec - current = min(@capacity, level + refilled) + refilled = div(now - updated, 1_000) * refill_per_sec + current = min(capacity, level + refilled) if current >= cost do level = current - cost @@ -92,9 +98,6 @@ defmodule Lightning.WebhookRateLimiter do end end - def capacity, do: @capacity - def refill_per_second, do: @refill_per_sec - def via_tuple(name), do: {:via, Horde.Registry, {Lightning.HordeRegistry, name}} end From d072ec4dbd1bcf3ea6e32318a27d58bcccbec96a Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Thu, 1 May 2025 22:29:09 +0200 Subject: [PATCH 12/14] Move capacity and refill to config --- .iex.exs | 2 +- config/config.exs | 5 +++ config/test.exs | 2 ++ lib/lightning/application.ex | 15 ++++++--- lib/lightning/webhook_rate_limiter.ex | 48 +++++++++++++++------------ 5 files changed, 46 insertions(+), 26 deletions(-) diff --git a/.iex.exs b/.iex.exs index bb72483cd4..a26dcdf6f4 100644 --- a/.iex.exs +++ b/.iex.exs @@ -1,5 +1,5 @@ if Code.loaded?(Ecto.Query) do - Kernel.SpecialForms.import(Ecto.Query) + import Ecto.Query end alias Lightning.Repo diff --git a/config/config.exs b/config/config.exs index e7899043ca..ef23f0f319 100644 --- a/config/config.exs +++ b/config/config.exs @@ -25,6 +25,11 @@ config :lightning, LightningWeb.Endpoint, pubsub_server: Lightning.PubSub, live_view: [signing_salt: "EfrmuOUr"] +config :lightning, Lightning.WebhookRateLimiter, + start: false, + capacity: 10, + refill_per_second: 2 + config :lightning, Lightning.Extensions, rate_limiter: Lightning.Extensions.RateLimiter, usage_limiter: Lightning.Extensions.UsageLimiter, diff --git a/config/test.exs b/config/test.exs index b19890a461..9347844ed1 100644 --- a/config/test.exs +++ b/config/test.exs @@ -53,6 +53,8 @@ config :lightning, LightningWeb.Endpoint, "/8zedVJLxvmGGFoRExE3e870g7CGZZQ1Vq11A5MbQGPKOpK57MahVsPW6Wkkv61n", server: true +config :lightning, Lightning.WebhookRateLimiter, start: true + config :lightning, Lightning.Runtime.RuntimeManager, ws_url: "ws://localhost:4002/worker" diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 23f9a0bbf3..b9e6407101 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -7,6 +7,11 @@ defmodule Lightning.Application do require Logger + @rate_limiter_opts Application.compile_env!( + :lightning, + Lightning.WebhookRateLimiter + ) + @impl true def start(_type, _args) do # Initialize ETS table for adapter lookup @@ -181,10 +186,12 @@ defmodule Lightning.Application do end def start_phase(:init_rate_limiter, :normal, _args) do - Horde.DynamicSupervisor.start_child( - Lightning.DistributedSupervisor, - Lightning.WebhookRateLimiter - ) + if @rate_limiter_opts[:start] do + Horde.DynamicSupervisor.start_child( + Lightning.DistributedSupervisor, + {Lightning.WebhookRateLimiter, @rate_limiter_opts} + ) + end :ok end diff --git a/lib/lightning/webhook_rate_limiter.ex b/lib/lightning/webhook_rate_limiter.ex index 294ee3942d..ca7fe6e377 100644 --- a/lib/lightning/webhook_rate_limiter.ex +++ b/lib/lightning/webhook_rate_limiter.ex @@ -2,9 +2,6 @@ defmodule Lightning.WebhookRateLimiter do @moduledoc false use GenServer - @capacity 10 - @refill_per_sec 2 - require Logger def child_spec(opts) do @@ -17,31 +14,36 @@ defmodule Lightning.WebhookRateLimiter do %{ id: id, - start: {__MODULE__, :start_link, [name]}, + start: {__MODULE__, :start_link, [Keyword.put(opts, :name, name)]}, shutdown: 10_000, restart: :transient } end - def start_link(name) do + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + with {:error, {:already_started, pid}} <- - GenServer.start_link(__MODULE__, [], name: via_tuple(name)) do + GenServer.start_link(__MODULE__, opts, name: via_tuple(name)) do Logger.info("already started at #{inspect(pid)}, returning :ignore") :ignore end end @impl true - def init([]) do + def init(opts) do Process.flag(:trap_exit, true) - {:ok, %{table: :ets.new(:table, [:set])}} + capacity = Keyword.fetch!(opts, :capacity) + refill = Keyword.fetch!(opts, :refill_per_second) + + {:ok, %{table: :ets.new(:table, [:set]), capacity: capacity, refill_per_second: refill}} end - def check_rate(bucket, cost \\ 1, name \\ __MODULE__) do + def check_rate(bucket, capacity \\ nil, refill \\ nil, name \\ __MODULE__) do name |> via_tuple() - |> GenServer.call({:check_rate, bucket, cost}) + |> GenServer.call({:check_rate, bucket, capacity, refill}) end def inspect_table(name \\ __MODULE__) do @@ -51,8 +53,8 @@ defmodule Lightning.WebhookRateLimiter do end @impl true - def handle_call({:check_rate, bucket, cost}, _from, %{table: table} = state) do - {:reply, do_check_rate(table, bucket, cost), state} + def handle_call({:check_rate, bucket, capacity, refill}, _from, state) do + {:reply, do_check_rate(state, bucket, capacity, refill), state} end @impl true @@ -72,17 +74,24 @@ defmodule Lightning.WebhookRateLimiter do {:stop, :normal, state} end - def do_check_rate(table, bucket, cost) do + def do_check_rate( + %{table: table} = config, + bucket, + capacity, + refill_per_sec + ) do now = System.monotonic_time(:millisecond) + capacity = capacity || config[:capacity] + refill_per_sec = refill_per_sec || config[:refill_per_second] - :ets.insert_new(table, {bucket, {@capacity, now}}) + :ets.insert_new(table, {bucket, {capacity, now}}) [{^bucket, {level, updated}}] = :ets.lookup(table, bucket) - refilled = div(now - updated, 1_000) * @refill_per_sec - current = min(@capacity, level + refilled) + refilled = div(now - updated, 1_000) * refill_per_sec + current = min(capacity, level + refilled) - if current >= cost do - level = current - cost + if current >= 1 do + level = current - 1 :ets.insert(table, {bucket, {level, now}}) {:allow, level} @@ -92,9 +101,6 @@ defmodule Lightning.WebhookRateLimiter do end end - def capacity, do: @capacity - def refill_per_second, do: @refill_per_sec - def via_tuple(name), do: {:via, Horde.Registry, {Lightning.HordeRegistry, name}} end From e6191a947dd1ca49c311e2bfb9a26aaad7789a48 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Fri, 2 May 2025 09:47:53 +0200 Subject: [PATCH 13/14] Remove the replicated and rename the webhook to generic term --- config/config.exs | 2 +- config/test.exs | 2 +- lib/lightning/application.ex | 4 +- ...limiter.ex => distributed_rate_limiter.ex} | 6 +- lib/lightning/extensions/rate_limiting.ex | 3 + lib/replicated_rate_limiter.ex | 270 ------------------ ....exs => distributed_rate_limiter_test.exs} | 36 ++- .../extensions/rate_limiter_test.exs | 5 +- test/lightning/rate_limiters_test.exs | 31 -- test/replicated_rate_limiter_test.exs | 77 ----- 10 files changed, 33 insertions(+), 403 deletions(-) rename lib/lightning/{webhook_rate_limiter.ex => distributed_rate_limiter.ex} (95%) delete mode 100644 lib/replicated_rate_limiter.ex rename test/lightning/{webhook_rate_limiter_test.exs => distributed_rate_limiter_test.exs} (68%) delete mode 100644 test/replicated_rate_limiter_test.exs diff --git a/config/config.exs b/config/config.exs index ef23f0f319..e1e15e1813 100644 --- a/config/config.exs +++ b/config/config.exs @@ -25,7 +25,7 @@ config :lightning, LightningWeb.Endpoint, pubsub_server: Lightning.PubSub, live_view: [signing_salt: "EfrmuOUr"] -config :lightning, Lightning.WebhookRateLimiter, +config :lightning, Lightning.DistributedRateLimiter, start: false, capacity: 10, refill_per_second: 2 diff --git a/config/test.exs b/config/test.exs index 9347844ed1..477aa9b70c 100644 --- a/config/test.exs +++ b/config/test.exs @@ -53,7 +53,7 @@ config :lightning, LightningWeb.Endpoint, "/8zedVJLxvmGGFoRExE3e870g7CGZZQ1Vq11A5MbQGPKOpK57MahVsPW6Wkkv61n", server: true -config :lightning, Lightning.WebhookRateLimiter, start: true +config :lightning, Lightning.DistributedRateLimiter, start: true config :lightning, Lightning.Runtime.RuntimeManager, ws_url: "ws://localhost:4002/worker" diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index b9e6407101..9a546d157c 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -9,7 +9,7 @@ defmodule Lightning.Application do @rate_limiter_opts Application.compile_env!( :lightning, - Lightning.WebhookRateLimiter + Lightning.DistributedRateLimiter ) @impl true @@ -189,7 +189,7 @@ defmodule Lightning.Application do if @rate_limiter_opts[:start] do Horde.DynamicSupervisor.start_child( Lightning.DistributedSupervisor, - {Lightning.WebhookRateLimiter, @rate_limiter_opts} + {Lightning.DistributedRateLimiter, @rate_limiter_opts} ) end diff --git a/lib/lightning/webhook_rate_limiter.ex b/lib/lightning/distributed_rate_limiter.ex similarity index 95% rename from lib/lightning/webhook_rate_limiter.ex rename to lib/lightning/distributed_rate_limiter.ex index 9a506bb629..c981872123 100644 --- a/lib/lightning/webhook_rate_limiter.ex +++ b/lib/lightning/distributed_rate_limiter.ex @@ -1,4 +1,4 @@ -defmodule Lightning.WebhookRateLimiter do +defmodule Lightning.DistributedRateLimiter do @moduledoc false use GenServer @@ -98,8 +98,8 @@ defmodule Lightning.WebhookRateLimiter do {:allow, level} else - # can retry after 1 second - {:deny, 1} + wait_ms = 1_000 - (now - updated) + {:deny, wait_ms} end end diff --git a/lib/lightning/extensions/rate_limiting.ex b/lib/lightning/extensions/rate_limiting.ex index aae3a0232d..4f8c891d1e 100644 --- a/lib/lightning/extensions/rate_limiting.ex +++ b/lib/lightning/extensions/rate_limiting.ex @@ -8,6 +8,9 @@ defmodule Lightning.Extensions.RateLimiting do @type message :: Lightning.Extensions.Message.t() defmodule Context do + @moduledoc """ + Context for the object (bucket) under rate limiting. + """ @type t :: %Context{project_id: Ecto.UUID.t()} defstruct [:project_id] diff --git a/lib/replicated_rate_limiter.ex b/lib/replicated_rate_limiter.ex deleted file mode 100644 index 96d0537f85..0000000000 --- a/lib/replicated_rate_limiter.ex +++ /dev/null @@ -1,270 +0,0 @@ -defmodule ReplicatedRateLimiter do - @moduledoc """ - __using__/1 will inject: - - - `ReplicatedRateLimiter.TokenBucket` - your ETS/CRDT sync server - - `start_link/1` & `child_spec/1` - to supervise both - - `allow?/4` - your public rate‑limit check - - Options: - - - `:crdt_name` - atom name for your AWLWWMap - - `:ets_table` - atom for your ETS cache - - `:default_capacity` - default bucket size - - `:default_refill` - default tokens/sec - """ - - defmacro __using__(opts) do - crdt_name = - Keyword.get(opts, :crdt_name) - - ets_table = - Keyword.get(opts, :ets_table) - - default_capacity = Keyword.get(opts, :default_capacity, 100) - default_refill = Keyword.get(opts, :default_refill, 10) - - quote do - use Supervisor - alias DeltaCrdt - require Logger - - default_name_prefix = - __MODULE__ - |> Module.split() - |> Enum.join() - |> String.replace(~r/(? String.downcase() - - @crdt_name unquote(crdt_name) || - (Macro.escape(default_name_prefix) <> "_crdt") - |> String.to_atom() - - @ets_table unquote(ets_table) || - (Macro.escape(default_name_prefix) <> "_ets") - |> String.to_atom() - - @cluster_name (Macro.escape(default_name_prefix) <> "_cluster") - |> String.to_atom() - - @default_capacity unquote(default_capacity) - @default_refill unquote(default_refill) - - @doc """ - Same as TokenBucket.allow?/4, but with a default capacity and refill rate. - """ - def allow?( - key, - capacity \\ @default_capacity, - refill_rate \\ @default_refill, - cost \\ 1 - ) do - ReplicatedRateLimiter.TokenBucket.allow?( - [crdt_name: @crdt_name, ets_table: @ets_table], - key, - capacity, - refill_rate, - cost - ) - end - - def to_list(bucket) do - ReplicatedRateLimiter.TokenBucket.to_list( - [crdt_name: @crdt_name, ets_table: @ets_table], - bucket - ) - end - - def config do - [crdt_name: @crdt_name, ets_table: @ets_table] - end - - @doc false - def child_spec(opts) do - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, [opts]}, - type: :supervisor - } - end - - def start_link(opts \\ []) do - Supervisor.start_link(__MODULE__, opts, name: __MODULE__) - end - - @impl true - def init(_opts) do - children = [ - {ReplicatedRateLimiter.TokenBucket, config()}, - {DeltaCrdt, - [ - crdt: DeltaCrdt.AWLWWMap, - name: @crdt_name, - sync_interval: 100, - on_diffs: - {ReplicatedRateLimiter.TokenBucket, :apply_diffs, [config()]} - ]}, - {CrdtCluster, [crdt: @crdt_name, name: @cluster_name]} - ] - - Supervisor.init(children, strategy: :one_for_one) - end - end - end - - defmodule TokenBucket do - use GenServer - require Logger - alias DeltaCrdt - - def start_link(opts) do - GenServer.start_link(__MODULE__, opts) - end - - def allow?(config, bucket, capacity, refill, cost) do - ets_table = Keyword.fetch!(config, :ets_table) - crdt_name = Keyword.fetch!(config, :crdt_name) - - now = System.system_time(:second) - - :ets.insert_new(ets_table, {bucket, {capacity, now}}) - [{^bucket, {level, updated}}] = :ets.lookup(ets_table, bucket) - - refilled = trunc((now - updated) * refill) - current = min(capacity, level + refilled) - - if current >= cost do - next_level = current - cost - :ets.insert(ets_table, {bucket, {next_level, now}}) - - # TODO do this in a separate unlinked process - DeltaCrdt.put( - crdt_name, - {bucket, "#{Node.self()}"}, - {next_level, now} - ) - - {:allow, next_level} - else - wait_ms = ceil((cost - current) / refill * 1_000) |> round() - {:deny, wait_ms} - end - end - - def to_list(config, bucket) do - ets_table = Keyword.get(config, :ets_table) - - :ets.lookup(ets_table, bucket) - |> case do - [{^bucket, {level, updated}}] -> - {level, updated} - - [] -> - :not_found - - _ -> - :error - end - end - - @impl true - def init(opts) do - ets_table = Keyword.fetch!(opts, :ets_table) - - :ets.new(ets_table, [ - :named_table, - :public, - read_concurrency: true - ]) - - {:ok, %{}} - end - - def apply_diffs(config, diffs) when is_list(diffs) do - Logger.debug("Applying diffs: #{inspect(diffs)}") - - ets_table = Keyword.fetch!(config, :ets_table) - crdt_name = Keyword.fetch!(config, :crdt_name) - - Task.start(fn -> - changed_buckets = - diffs - |> Enum.map(fn {:add, {key, _node}, _value} -> key end) - |> Enum.uniq() - - if changed_buckets != [] do - crdt_map = DeltaCrdt.to_map(crdt_name) - # %{ - # {"test", "a@127.0.0.1"} => {9, 1745414954}, - # {"test", "b@127.0.0.1"} => {9, 1745414927} - # } - - Enum.reduce(crdt_map, %{}, fn {{bucket, _node}, value}, acc -> - if bucket in changed_buckets do - Map.update(acc, bucket, [value], fn existing_values -> - [value | existing_values] - end) - else - acc - end - end) - |> Enum.each(fn {bucket, values} -> - :ets.insert( - ets_table, - {bucket, Enum.max_by(values, fn {_, v} -> v end)} - ) - end) - end - end) - - :ok - end - end -end - -defmodule CrdtCluster do - # Standalone Cluster module that can be used by any CRDT - @moduledoc false - - use GenServer - require Logger - - def start_link(opts) do - crdt_name = Keyword.fetch!(opts, :crdt) - name = Keyword.get(opts, :name, __MODULE__) - - GenServer.start_link(__MODULE__, {crdt_name, name}, name: name) - end - - @impl true - def init(crdt_name) do - # watch node up/down - :net_kernel.monitor_nodes(true, node_type: :visible) - sync_neighbours(crdt_name) - {:ok, %{crdt_name: crdt_name}} - end - - @impl true - def handle_info({:nodeup, _n, _}, state) do - sync_neighbours(state.crdt_name) - {:noreply, state} - end - - def handle_info({:nodedown, _n, _}, state) do - sync_neighbours(state.crdt_name) - {:noreply, state} - end - - def handle_info(_, state), do: {:noreply, state} - - defp sync_neighbours(crdt_name) do - peers = Node.list() - neighbours = Enum.map(peers, &{crdt_name, &1}) - - Logger.debug( - "CRDT neighbours for #{inspect(crdt_name)}: #{inspect(neighbours)}" - ) - - DeltaCrdt.set_neighbours(crdt_name, neighbours) - end -end diff --git a/test/lightning/webhook_rate_limiter_test.exs b/test/lightning/distributed_rate_limiter_test.exs similarity index 68% rename from test/lightning/webhook_rate_limiter_test.exs rename to test/lightning/distributed_rate_limiter_test.exs index bc38c82054..78a6a1800a 100644 --- a/test/lightning/webhook_rate_limiter_test.exs +++ b/test/lightning/distributed_rate_limiter_test.exs @@ -1,8 +1,8 @@ -defmodule Lightning.WebhookRateLimiterTest do +defmodule Lightning.DistributedRateLimiterTest do @moduledoc false use ExUnit.Case - alias Lightning.WebhookRateLimiter + alias Lightning.DistributedRateLimiter @default_capacity 10 @@ -14,8 +14,16 @@ defmodule Lightning.WebhookRateLimiterTest do Enum.each(1..initial_capacity, fn i -> level = initial_capacity - i - assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket1)) - assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket2)) + + assert match?( + {:allow, ^level}, + DistributedRateLimiter.check_rate(bucket1) + ) + + assert match?( + {:allow, ^level}, + DistributedRateLimiter.check_rate(bucket2) + ) end) end @@ -25,15 +33,15 @@ defmodule Lightning.WebhookRateLimiterTest do bucket2 = "project#{System.unique_integer()}" Enum.each(1..initial_capacity, fn i -> - assert {:allow, level} = WebhookRateLimiter.check_rate(bucket1) + assert {:allow, level} = DistributedRateLimiter.check_rate(bucket1) assert level == initial_capacity - i end) - assert {:allow, level} = WebhookRateLimiter.check_rate(bucket2) + assert {:allow, level} = DistributedRateLimiter.check_rate(bucket2) assert level == initial_capacity - 1 - assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket1) - assert 0 < wait_ms and wait_ms < 1_000 + assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket1) |> dbg + assert 500 < wait_ms and wait_ms <= 1_000 end # Synthetic cluster not working. @@ -41,7 +49,7 @@ defmodule Lightning.WebhookRateLimiterTest do # 0. Disable Endpoint server # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server - # 3. Call Lightning.WebhookRateLimiter.inspect_table() on both iex and they show the same ets table process and node. + # 3. Call Lightning.DistributedRateLimiter.inspect_table() on both iex and they show the same ets table process and node. @tag skip: true test "consumes the bucket remotely" do {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") @@ -60,17 +68,17 @@ defmodule Lightning.WebhookRateLimiterTest do # initial_capacity = @default_capacity bucket = "project#{System.unique_integer()}" - dbg(WebhookRateLimiter.check_rate(bucket)) + dbg(DistributedRateLimiter.check_rate(bucket)) - # dbg :rpc.block_call(node1, WebhookRateLimiter, :inspect, [WebhookRateLimiter]) - # dbg :rpc.block_call(node2, WebhookRateLimiter, :inspect, [WebhookRateLimiter]) + # dbg :rpc.block_call(node1, DistributedRateLimiter, :inspect, [DistributedRateLimiter]) + # dbg :rpc.block_call(node2, DistributedRateLimiter, :inspect, [DistributedRateLimiter]) # Enum.each(1..initial_capacity-1, fn i -> - # assert {:allow, level} = :rpc.call(node2, WebhookRateLimiter, :check_rate, [bucket, 1]) + # assert {:allow, level} = :rpc.call(node2, DistributedRateLimiter, :check_rate, [bucket, 1]) # assert level == initial_capacity - i - 1 # end) - # assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket) + # assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket) # assert 0 < wait_ms and wait_ms < 1_000 :peer.stop(peer) diff --git a/test/lightning/extensions/rate_limiter_test.exs b/test/lightning/extensions/rate_limiter_test.exs index e32a9f9ac1..af71e33c76 100644 --- a/test/lightning/extensions/rate_limiter_test.exs +++ b/test/lightning/extensions/rate_limiter_test.exs @@ -8,10 +8,7 @@ defmodule Lightning.Extensions.RateLimiterTest do Enum.each(1..100, fn _i -> assert RateLimiter.limit_request( conn, - %Context{ - project_id: Ecto.UUID.generate(), - user_id: Ecto.UUID.generate() - }, + %Context{project_id: Ecto.UUID.generate()}, [] ) == :ok end) diff --git a/test/lightning/rate_limiters_test.exs b/test/lightning/rate_limiters_test.exs index 784c40a54c..47998f50a8 100644 --- a/test/lightning/rate_limiters_test.exs +++ b/test/lightning/rate_limiters_test.exs @@ -10,35 +10,4 @@ defmodule Lightning.RateLimitersTest do assert RateLimiters.Mail.hit(id, 1, 1) == {:deny, 1000} end end - - describe "Webhook" do - setup do - Mox.stub_with(LightningMock, Lightning.API) - - :ok - end - - test "returns a hit result" do - id = Ecto.UUID.generate() - assert RateLimiters.Webhook.hit(id, 1, 1) == {:allow, 1} - assert RateLimiters.Webhook.hit(id, 1, 1) == {:deny, 1000} - end - - test "returns a hit result for a project id" do - # 10 requests per second, then denied for 1 second - id = Ecto.UUID.generate() - - for i <- 1..10 do - assert RateLimiters.hit({:webhook, id}) == {:allow, i} - Process.sleep(5) - end - - assert RateLimiters.hit({:webhook, id}) == {:deny, 1000} - - Process.sleep(1005) - - # Leaked by 2, then add 1 for the next hit. - assert RateLimiters.hit({:webhook, id}) == {:allow, 9} - end - end end diff --git a/test/replicated_rate_limiter_test.exs b/test/replicated_rate_limiter_test.exs deleted file mode 100644 index 7cc30bfb86..0000000000 --- a/test/replicated_rate_limiter_test.exs +++ /dev/null @@ -1,77 +0,0 @@ -defmodule ReplicatedRateLimiterTest do - use ExUnit.Case, async: false - import Eventually - - defmodule CrdtEts do - use ReplicatedRateLimiter, - default_capacity: 10, - default_refill: 2 - end - - defmodule AnotherRateLimiter do - use ReplicatedRateLimiter - end - - test "CrdtEts allows calls directly from the main module" do - start_supervised!(CrdtEts) - - config = CrdtEts.config() |> Map.new() - - # Check one time (default cost is 1) - assert match?({:allow, 9}, CrdtEts.allow?("project1")) - - before = System.system_time(:second) - - # Make several calls to hit the limit (default capacity is 10) - Enum.each(1..9, fn _ -> CrdtEts.allow?("project2") end) - - assert_eventually( - DeltaCrdt.get(config.crdt_name, {"project2", "#{Node.self()}"}) - |> dbg - |> elem(0) == 0, - 1_000 - ) - - assert {0, last_updated} = - CrdtEts.to_list("project2"), - "ETS should be the same as CRDT" - - assert before <= last_updated - - # This should be denied since we consumed all tokens - assert {:deny, 1000} = CrdtEts.allow?("project2", 10, 2) - - # Node2 enters the dungeon - {:ok, test_crdt} = - DeltaCrdt.start_link(DeltaCrdt.AWLWWMap) - - DeltaCrdt.set_neighbours(config.crdt_name, [test_crdt]) - DeltaCrdt.set_neighbours(test_crdt, [config.crdt_name]) - - # a time has passed and the bucket is refilled by Node2 - DeltaCrdt.put( - test_crdt, - {"project2", "another_node"}, - {10, System.system_time(:second)} - ) - - # Node2 consumes all the credits except one - Enum.each(1..9, fn i -> - DeltaCrdt.put( - test_crdt, - {"project2", "another_node"}, - {10 - i, System.system_time(:second)} - ) - end) - - # Wait for the bucket to be updated - assert_eventually(CrdtEts.to_list("project2") |> elem(0) == 1) - - assert {:allow, 4} = CrdtEts.allow?("project2", 10, 2) - end - - test "can start multiple rate limiters" do - start_supervised!(AnotherRateLimiter) - start_supervised!(CrdtEts) - end -end From 2f58f95682eea8641b9a3b82798806b0eabddf2f Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Fri, 2 May 2025 11:51:16 +0200 Subject: [PATCH 14/14] Fix test case and check for a single process/worker --- .../distributed_rate_limiter_test.exs | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/test/lightning/distributed_rate_limiter_test.exs b/test/lightning/distributed_rate_limiter_test.exs index 78a6a1800a..664edeb29f 100644 --- a/test/lightning/distributed_rate_limiter_test.exs +++ b/test/lightning/distributed_rate_limiter_test.exs @@ -40,18 +40,16 @@ defmodule Lightning.DistributedRateLimiterTest do assert {:allow, level} = DistributedRateLimiter.check_rate(bucket2) assert level == initial_capacity - 1 - assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket1) |> dbg + assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket1) assert 500 < wait_ms and wait_ms <= 1_000 end - # Synthetic cluster not working. - # For testing use manual procedure: + # For testing the replication use manual procedure: # 0. Disable Endpoint server # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server # 3. Call Lightning.DistributedRateLimiter.inspect_table() on both iex and they show the same ets table process and node. - @tag skip: true - test "consumes the bucket remotely" do + test "works on top of a single worker of a distributed dynamic supervisor" do {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") :rpc.call(node2, Application, :ensure_all_started, [:mix]) @@ -65,21 +63,10 @@ defmodule Lightning.DistributedRateLimiterTest do {Lightning.DistributedSupervisor, :node2@localhost} ] = Horde.Cluster.members(Lightning.DistributedSupervisor) - # initial_capacity = @default_capacity - bucket = "project#{System.unique_integer()}" - - dbg(DistributedRateLimiter.check_rate(bucket)) - - # dbg :rpc.block_call(node1, DistributedRateLimiter, :inspect, [DistributedRateLimiter]) - # dbg :rpc.block_call(node2, DistributedRateLimiter, :inspect, [DistributedRateLimiter]) - - # Enum.each(1..initial_capacity-1, fn i -> - # assert {:allow, level} = :rpc.call(node2, DistributedRateLimiter, :check_rate, [bucket, 1]) - # assert level == initial_capacity - i - 1 - # end) - - # assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket) - # assert 0 < wait_ms and wait_ms < 1_000 + assert [{:undefined, _pid, :worker, [Lightning.DistributedRateLimiter]}] = + Horde.DynamicSupervisor.which_children( + Lightning.DistributedSupervisor + ) :peer.stop(peer) end