Skip to content

Commit 31b770c

Browse files
authored
Merge pull request #1 from hiett/sh/transaction-support
Transaction support + release candidate 0.0.2
2 parents 4ef9714 + abcde98 commit 31b770c

File tree

8 files changed

+182
-31
lines changed

8 files changed

+182
-31
lines changed

config/config.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import Config
22

33
config :srh,
4-
mode: "file",
5-
file_path: "srh-config/tokens.json",
6-
port: 8080
4+
mode: "file",
5+
file_path: "srh-config/tokens.json",
6+
port: 8080
77

88
import_config "#{config_env()}.exs"

config/prod.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
import Config
22

33
config :srh,
4-
port: 80
4+
port: 80

config/runtime.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import Config
22

33
config :srh,
4-
mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file",
5-
file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json",
6-
port: Integer.parse(System.get_env("PORT") || "8080")
4+
mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file",
5+
file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json",
6+
port: Integer.parse(System.get_env("PORT") || "8080")

lib/srh.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ defmodule Srh do
55

66
def start(_type, _args) do
77
IO.puts("Using port #{@port}")
8-
8+
99
children = [
1010
Srh.Auth.TokenResolver,
1111
{GenRegistry, worker_module: Srh.Redis.Client},

lib/srh/http/base_router.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ defmodule Srh.Http.BaseRouter do
2323
|> handle_response(conn)
2424
end
2525

26+
post "/multi-exec" do
27+
conn
28+
|> handle_extract_auth(&CommandHandler.handle_command_transaction_array(conn, &1))
29+
|> handle_response(conn)
30+
end
31+
2632
match _ do
2733
send_resp(conn, 404, "Endpoint not found")
2834
end
@@ -51,6 +57,9 @@ defmodule Srh.Http.BaseRouter do
5157
{:malformed_data, message} ->
5258
%{code: 400, message: message, json: false}
5359

60+
{:redis_error, data} ->
61+
%{code: 400, message: Jason.encode!(data), json: true}
62+
5463
{:not_authorized, message} ->
5564
%{code: 401, message: message, json: false}
5665

lib/srh/http/command_handler.ex

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,17 @@ defmodule Srh.Http.CommandHandler do
2424
end
2525
end
2626

27+
def handle_command_transaction_array(conn, token) do
28+
# Transactions use the same body format as pipelines, so we can use the same validator
29+
case RequestValidator.validate_pipeline_redis_body(conn.body_params) do
30+
{:ok, array_of_command_arrays} ->
31+
do_handle_command_transaction_array(array_of_command_arrays, token)
32+
33+
{:error, error_message} ->
34+
{:malformed_data, error_message}
35+
end
36+
end
37+
2738
defp do_handle_command(command_array, token) do
2839
case TokenResolver.resolve(token) do
2940
{:ok, connection_info} ->
@@ -44,6 +55,16 @@ defmodule Srh.Http.CommandHandler do
4455
end
4556
end
4657

58+
defp do_handle_command_transaction_array(array_of_command_arrays, token) do
59+
case TokenResolver.resolve(token) do
60+
{:ok, connection_info} ->
61+
dispatch_command_transaction_array(array_of_command_arrays, connection_info)
62+
63+
{:error, msg} ->
64+
{:not_authorized, msg}
65+
end
66+
end
67+
4768
defp dispatch_command_array(_arr, _connection_info, responses \\ [])
4869

4970
defp dispatch_command_array([current | rest], connection_info, responses) do
@@ -52,9 +73,8 @@ defmodule Srh.Http.CommandHandler do
5273
{:ok, result_map} ->
5374
[result_map | responses]
5475

55-
{:malformed_data, result_json} ->
56-
# TODO: change up the chain to json this at the last moment, so this isn't here
57-
[Jason.decode!(result_json) | responses]
76+
{:redis_error, result} ->
77+
[result | responses]
5878
end
5979

6080
dispatch_command_array(rest, connection_info, updated_responses)
@@ -65,6 +85,61 @@ defmodule Srh.Http.CommandHandler do
6585
{:ok, Enum.reverse(responses)}
6686
end
6787

88+
defp dispatch_command_transaction_array(
89+
command_array,
90+
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info,
91+
responses \\ []
92+
) do
93+
case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do
94+
{:ok, client_pid} ->
95+
# Borrow a client, then run all of the commands (wrapped in MULTI and EXEC)
96+
worker_pid = Client.borrow_worker(client_pid)
97+
98+
wrapped_command_array = [["MULTI"] | command_array]
99+
do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)
100+
101+
# Now manually run the EXEC - this is what contains the information to form the response, not the above
102+
result = case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
103+
{:ok, res} ->
104+
{
105+
:ok,
106+
res
107+
|> Enum.map(&(%{result: &1}))
108+
}
109+
# TODO: Can there be any inline errors here? Wouldn't they fail the whole tx?
110+
111+
{:error, error} ->
112+
{:redis_error, %{error: error.message}}
113+
end
114+
115+
Client.return_worker(client_pid, worker_pid)
116+
117+
result
118+
{:error, msg} ->
119+
{:server_error, msg}
120+
end
121+
end
122+
123+
defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) when is_pid(worker_pid) do
124+
updated_responses = case ClientWorker.redis_command(worker_pid, current) do
125+
{:ok, res} ->
126+
[%{result: res} | responses]
127+
128+
{:error, error} ->
129+
[
130+
%{
131+
error: error.message
132+
} | responses
133+
]
134+
end
135+
136+
do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
137+
end
138+
139+
defp do_dispatch_command_transaction_array([], worker_pid, responses) when is_pid(worker_pid) do
140+
{:ok, Enum.reverse(responses)}
141+
end
142+
68143
defp dispatch_command(
69144
command_array,
70145
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info
@@ -80,10 +155,10 @@ defmodule Srh.Http.CommandHandler do
80155

81156
{:error, error} ->
82157
{
83-
:malformed_data,
84-
Jason.encode!(%{
158+
:redis_error,
159+
%{
85160
error: error.message
86-
})
161+
}
87162
}
88163
end
89164

lib/srh/redis/client.ex

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,38 @@ defmodule Srh.Redis.Client do
2727
GenServer.call(client, {:find_worker})
2828
end
2929

30+
def borrow_worker(client) do
31+
GenServer.call(client, {:borrow_worker})
32+
end
33+
34+
def return_worker(client, pid) do
35+
GenServer.cast(client, {:return_worker, pid})
36+
end
37+
3038
def handle_call({:find_worker}, _from, %{registry_pid: registry_pid} = state)
3139
when is_pid(registry_pid) do
3240
{:ok, worker} = ClientRegistry.find_worker(registry_pid)
3341
Process.send(self(), :reset_idle_death, [])
3442
{:reply, worker, state}
3543
end
3644

45+
def handle_call({:borrow_worker}, _from, %{registry_pid: registry_pid} = state)
46+
when is_pid(registry_pid) do
47+
{:ok, worker} = ClientRegistry.borrow_worker(registry_pid)
48+
Process.send(self(), :reset_idle_death, [])
49+
{:reply, worker, state}
50+
end
51+
3752
def handle_call(_msg, _from, state) do
3853
{:reply, :ok, state}
3954
end
4055

56+
def handle_cast({:return_worker, pid}, %{registry_pid: registry_pid} = state)
57+
when is_pid(pid) and is_pid(registry_pid) do
58+
ClientRegistry.return_worker(registry_pid, pid)
59+
{:noreply, state}
60+
end
61+
4162
def handle_cast(_msg, state) do
4263
{:noreply, state}
4364
end

lib/srh/redis/client_registry.ex

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ defmodule Srh.Redis.ClientRegistry do
1010
:ok,
1111
%{
1212
worker_pids: [],
13-
last_worker_index: 0
13+
last_worker_index: 0,
14+
currently_borrowed_pids: []
1415
}
1516
}
1617
end
@@ -19,6 +20,14 @@ defmodule Srh.Redis.ClientRegistry do
1920
GenServer.call(registry, {:find_worker})
2021
end
2122

23+
def borrow_worker(registry) do
24+
GenServer.call(registry, {:borrow_worker})
25+
end
26+
27+
def return_worker(registry, pid) do
28+
GenServer.cast(registry, {:return_worker, pid})
29+
end
30+
2231
def add_worker(registry, pid) do
2332
GenServer.cast(registry, {:add_worker, pid})
2433
end
@@ -27,25 +36,31 @@ defmodule Srh.Redis.ClientRegistry do
2736
GenServer.cast(registry, {:destroy_workers})
2837
end
2938

30-
def handle_call({:find_worker}, _from, state) do
31-
case length(state.worker_pids) do
32-
0 ->
33-
{:reply, {:error, :none_available}, state}
34-
35-
len ->
36-
target = state.last_worker_index + 1
37-
38-
corrected_target =
39-
case target >= len do
40-
true -> 0
41-
false -> target
42-
end
43-
44-
{:reply, {:ok, Enum.at(state.worker_pids, corrected_target)},
45-
%{state | last_worker_index: corrected_target}}
39+
def handle_call({:borrow_worker}, _from, state) do
40+
case do_find_worker(state) do
41+
{{:error, msg}, state_update} ->
42+
{:reply, {:error, msg}, state_update}
43+
44+
{{:ok, pid}, state_update} ->
45+
# We want to put this pid into the borrowed pids state list
46+
{
47+
:reply,
48+
{:ok, pid},
49+
%{
50+
state_update
51+
| currently_borrowed_pids:
52+
[pid | state_update.currently_borrowed_pids]
53+
|> Enum.uniq()
54+
}
55+
}
4656
end
4757
end
4858

59+
def handle_call({:find_worker}, _from, state) do
60+
{res, state_update} = do_find_worker(state)
61+
{:reply, res, state_update}
62+
end
63+
4964
def handle_call(_msg, _from, state) do
5065
{:reply, :ok, state}
5166
end
@@ -72,6 +87,12 @@ defmodule Srh.Redis.ClientRegistry do
7287
{:noreply, %{state | worker_pids: [], last_worker_index: 0}}
7388
end
7489

90+
def handle_cast({:return_worker, pid}, state) do
91+
# Remove it from the borrowed array
92+
{:noreply,
93+
%{state | currently_borrowed_pids: List.delete(state.currently_borrowed_pids, pid)}}
94+
end
95+
7596
def handle_cast(_msg, state) do
7697
{:noreply, state}
7798
end
@@ -83,4 +104,29 @@ defmodule Srh.Redis.ClientRegistry do
83104
def handle_info(_msg, state) do
84105
{:noreply, state}
85106
end
107+
108+
defp do_find_worker(state) do
109+
filtered_pids =
110+
state.worker_pids
111+
|> Enum.filter(&(!Enum.member?(state.currently_borrowed_pids, &1)))
112+
113+
case length(filtered_pids) do
114+
0 ->
115+
{{:error, :none_available}, state}
116+
117+
len ->
118+
target = state.last_worker_index + 1
119+
120+
corrected_target =
121+
case target >= len do
122+
true -> 0
123+
false -> target
124+
end
125+
126+
{
127+
{:ok, Enum.at(state.worker_pids, corrected_target)},
128+
%{state | last_worker_index: corrected_target}
129+
}
130+
end
131+
end
86132
end

0 commit comments

Comments
 (0)