Skip to content

Commit 0ce7750

Browse files
committed
Implements async member sync with global progress
1 parent 06ca5de commit 0ce7750

File tree

6 files changed

+307
-49
lines changed

6 files changed

+307
-49
lines changed

lib/valkyrie/application.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ defmodule Valkyrie.Application do
1414
repos: Application.fetch_env!(:valkyrie, :ecto_repos), skip: skip_migrations?()},
1515
{DNSCluster, query: Application.get_env(:valkyrie, :dns_cluster_query) || :ignore},
1616
{Phoenix.PubSub, name: Valkyrie.PubSub},
17+
Valkyrie.Members.SyncState,
1718
# Start a worker by calling: Valkyrie.Worker.start_link(arg)
1819
# {Valkyrie.Worker, arg},
1920
# Start to serve requests, typically the last entry

lib/valkyrie/authentik.ex

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ defmodule Valkyrie.Authentik do
1111
]
1212
end
1313

14+
@page_size 100
15+
1416
defp bearer_token do
1517
Application.fetch_env!(:valkyrie, :authentik_token)
1618
end
@@ -19,25 +21,44 @@ defmodule Valkyrie.Authentik do
1921
Application.fetch_env!(:valkyrie, :authentik_url)
2022
end
2123

22-
defp do_get_all_users(page \\ 1, acc \\ []) do
24+
defp do_get_all_users(page, acc, progress_callback) do
2325
Logger.debug("Fetching users from page #{page}")
2426

2527
url = "#{base_url()}/api/v3/core/users/"
2628

2729
case Req.get(url,
2830
auth: {:bearer, bearer_token()},
29-
params: [page: page, page_size: 20, is_active: true],
31+
params: [page: page, page_size: @page_size, is_active: true],
3032
receive_timeout: 20_000
3133
) do
3234
{:ok, %Req.Response{status: 200, body: %{"results" => results, "pagination" => pagination}}} ->
33-
Logger.debug("Found #{length(results)} users on page #{page}")
34-
3535
new_acc = Enum.concat(acc, results)
3636

37+
# Calculate total pages if available in pagination
38+
total_pages =
39+
case pagination do
40+
%{"count" => count} when is_integer(count) ->
41+
# Calculate total pages from count and page_size
42+
ceil(count / @page_size)
43+
44+
_ ->
45+
nil
46+
end
47+
48+
# Call progress callback if provided
49+
if not is_nil(progress_callback) do
50+
progress_callback.(%{
51+
page: page,
52+
total_pages: total_pages,
53+
users_fetched: length(new_acc),
54+
status: :in_progress
55+
})
56+
end
57+
3758
case pagination["next"] do
3859
next_page when next_page > 0 and is_integer(next_page) ->
3960
# Fetch next page recursively
40-
do_get_all_users(next_page, new_acc)
61+
do_get_all_users(next_page, new_acc, progress_callback)
4162

4263
_ ->
4364
# No more pages
@@ -52,8 +73,8 @@ defmodule Valkyrie.Authentik do
5273
end
5374
end
5475

55-
def get_all_users do
56-
case do_get_all_users() do
76+
def get_all_users(progress_callback \\ nil) do
77+
case do_get_all_users(1, [], progress_callback) do
5778
{:ok, users} ->
5879
{:ok,
5980
users

lib/valkyrie/members.ex

Lines changed: 113 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ defmodule Valkyrie.Members do
22
use Ash.Domain, otp_app: :valkyrie, extensions: [AshAdmin.Domain, AshPaperTrail.Domain]
33

44
require Logger
5+
require Ash.Query
56

67
alias Valkyrie.Authentik
78
alias Valkyrie.Members.Member
9+
alias Valkyrie.Members.SyncState
810

911
@member_group_uuid "005b0c1c-c1bf-4a57-99b2-66cf04f94cda"
12+
@sync_progress_topic "sync_members:progress"
1013

1114
admin do
1215
show? true
@@ -40,32 +43,128 @@ defmodule Valkyrie.Members do
4043
end
4144

4245
def update_members_from_xhain_account_system do
43-
case Authentik.get_all_users() do
44-
{:ok, users} ->
45-
{:ok,
46-
users
47-
|> Enum.filter(&has_required_attributes?/1)
48-
|> Enum.map(&xhain_account_to_member_info/1)
49-
|> Enum.each(fn member ->
50-
Ash.create!(Member, member, action: :create)
51-
end)}
52-
53-
{:error, reason} ->
54-
{:error, reason}
46+
perform_sync()
47+
end
48+
49+
def update_members_from_xhain_account_system_async() do
50+
case SyncState.start_sync() do
51+
:ok ->
52+
task = Task.async(fn -> perform_sync() end)
53+
{:ok, task}
54+
55+
{:error, :already_syncing} ->
56+
{:error, :already_syncing}
5557
end
5658
end
5759

60+
defp perform_sync() do
61+
try do
62+
progress_callback = create_progress_callback()
63+
64+
case Authentik.get_all_users(progress_callback) do
65+
{:ok, users} ->
66+
valid_users = process_valid_users(users)
67+
broadcast_progress(:processing, length(valid_users))
68+
remove_obsolete_members(valid_users)
69+
create_members(valid_users)
70+
broadcast_progress(:completed, length(valid_users))
71+
72+
{:error, reason} ->
73+
handle_sync_error(reason)
74+
end
75+
rescue
76+
e ->
77+
handle_sync_error(Exception.message(e))
78+
raise e
79+
after
80+
SyncState.finish_sync()
81+
end
82+
end
83+
84+
defp create_progress_callback() do
85+
fn progress ->
86+
Phoenix.PubSub.broadcast(
87+
Valkyrie.PubSub,
88+
@sync_progress_topic,
89+
{:sync_progress, progress}
90+
)
91+
end
92+
end
93+
94+
defp process_valid_users(users) do
95+
users
96+
|> Enum.filter(&has_required_attributes?/1)
97+
|> Enum.map(&xhain_account_to_member_info/1)
98+
end
99+
100+
defp remove_obsolete_members(valid_users) do
101+
Member
102+
|> Ash.Query.filter(is_manual_entry: false)
103+
|> Ash.read!()
104+
|> Enum.filter(fn member -> not member_exists_in_list?(member, valid_users) end)
105+
|> Enum.each(fn member ->
106+
Logger.info("Removing member #{inspect(member)}")
107+
Ash.destroy!(member, action: :destroy)
108+
end)
109+
end
110+
111+
defp create_members(valid_users) do
112+
Enum.each(valid_users, fn member ->
113+
Ash.create!(Member, member, action: :create)
114+
end)
115+
end
116+
117+
defp broadcast_progress(status, users_count) do
118+
progress_data = %{
119+
page: nil,
120+
total_pages: nil,
121+
users_fetched: users_count,
122+
status: status
123+
}
124+
125+
Phoenix.PubSub.broadcast(
126+
Valkyrie.PubSub,
127+
@sync_progress_topic,
128+
{:sync_progress, progress_data}
129+
)
130+
end
131+
132+
defp handle_sync_error(reason) do
133+
progress_data = %{
134+
page: nil,
135+
total_pages: nil,
136+
users_fetched: 0,
137+
status: :error,
138+
error: reason
139+
}
140+
141+
Phoenix.PubSub.broadcast(
142+
Valkyrie.PubSub,
143+
@sync_progress_topic,
144+
{:sync_progress, progress_data}
145+
)
146+
end
147+
148+
## Helper functions ##
149+
58150
defp xhain_account_to_member_info(%Authentik.XHainAccount{} = xhain_account) do
59151
%{
60152
username: xhain_account.username,
61153
xhain_account_id: xhain_account.xhain_account_id,
62154
ssh_public_key: xhain_account.ssh_public_key,
63155
tree_name: xhain_account.tree_name,
64-
is_active: is_member_in_group(xhain_account.groups, @member_group_uuid),
65-
has_key: get_key_status(xhain_account)
156+
is_active: is_member_in_group(xhain_account.groups, @member_group_uuid)
66157
}
67158
end
68159

160+
defp member_exists_in_list?(member, valid_users) do
161+
Enum.any?(valid_users, fn valid_user -> is_same_user?(member, valid_user) end)
162+
end
163+
164+
defp is_same_user?(user1, user2) do
165+
user1.username == user2.username
166+
end
167+
69168
defp has_required_attributes?(xhain_account) do
70169
has_username = not is_nil(xhain_account.username) and xhain_account.username != ""
71170
has_xhain_account_id = not is_nil(xhain_account.xhain_account_id)
@@ -85,14 +184,4 @@ defmodule Valkyrie.Members do
85184
defp is_member_in_group(groups, group_uuid) do
86185
Enum.any?(groups, fn group -> group == group_uuid end)
87186
end
88-
89-
defp get_key_status(member) do
90-
case get_member_by_username(member.username) do
91-
{:ok, member} ->
92-
member.has_key
93-
94-
_ ->
95-
false
96-
end
97-
end
98187
end

lib/valkyrie/members/sync_state.ex

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
defmodule Valkyrie.Members.SyncState do
2+
@moduledoc """
3+
Agent to track global sync state and prevent concurrent sync operations.
4+
"""
5+
use Agent
6+
7+
@name __MODULE__
8+
9+
def start_link(_opts) do
10+
Agent.start_link(fn -> nil end, name: @name)
11+
end
12+
13+
@doc """
14+
Returns true if a sync is currently running, false otherwise.
15+
"""
16+
def is_syncing? do
17+
Agent.get(@name, fn state -> not is_nil(state) end)
18+
end
19+
20+
@doc """
21+
Attempts to start a sync.
22+
Returns `:ok` if the lock was acquired, or `{:error, :already_syncing}` if a sync is already running.
23+
"""
24+
def start_sync do
25+
Agent.get_and_update(@name, fn
26+
nil ->
27+
state = %{started_at: DateTime.utc_now()}
28+
{:ok, state}
29+
30+
existing_state ->
31+
{{:error, :already_syncing}, existing_state}
32+
end)
33+
end
34+
35+
@doc """
36+
Finishes the current sync, clearing the state.
37+
"""
38+
def finish_sync do
39+
Agent.update(@name, fn _state -> nil end)
40+
end
41+
42+
@doc """
43+
Gets the current sync state (for debugging/monitoring).
44+
"""
45+
def get_state do
46+
Agent.get(@name, fn state -> state end)
47+
end
48+
end

0 commit comments

Comments
 (0)