refactor(registry): split register into create-only and replace paths#836
refactor(registry): split register into create-only and replace paths#836CatherineSue wants to merge 1 commit intomainfrom
Conversation
Replace the upsert-based register() with three distinct methods: - register(): create-only, returns None if URL already exists - replace(): overwrite-then-diff for updating existing workers - register_or_replace(): idempotent upsert for internal callers The replace() method updates the worker object in-place and diffs the model index (remove stale models, add new ones) instead of the remove-then-add pattern from PR #756. This eliminates the transient gap where a worker is missing from all indexes and removes the need for per-URL registration locks. - Remove url_registration_locks field and Mutex import - Update RegisterWorkersStep to use register_or_replace() - Update UpdateWorkerPropertiesStep to use register_or_replace() - Update all test call sites - Add tests: register rejects duplicate URL, replace updates model index, register_or_replace upsert semantics Signed-off-by: Chang Su <chang.s.su@oracle.com>
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves the robustness and clarity of worker management within the registry. By separating worker creation from updates and providing an idempotent upsert mechanism, it addresses previous issues related to transient worker unavailability and simplifies concurrency. This foundational change paves the way for more reliable and predictable worker lifecycle management, with future API changes planned to leverage these new internal methods. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
📝 WalkthroughWalkthroughThe PR refactors worker registration semantics in WorkerRegistry, changing Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Code Review
The pull request refactors the WorkerRegistry by splitting the register method into three distinct functions: a register (create-only) method that rejects duplicate URLs, a replace method for updating existing workers by ID, and a new register_or_replace upsert method. This change removes the need for url_registration_locks and its associated Mutex for serializing registrations. All internal and test usages of worker registration have been updated to use the new register_or_replace or to handle the Option<WorkerId> return type of the new register function, with new tests added to validate the updated behavior.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ace68e95d7
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if let Some(existing_id) = self.url_to_id.get(worker.url()).map(|e| e.clone()) { | ||
| self.replace(&existing_id, worker); | ||
| existing_id |
There was a problem hiding this comment.
Create the worker when a reserved URL has no entry
WorkerService::create_worker() pre-reserves an ID with reserve_id_for_url() before the async AddWorker workflow starts (model_gateway/src/core/worker_service.rs:235-247), and RegisterWorkersStep now routes that workflow through register_or_replace() (model_gateway/src/core/steps/worker/shared/register.rs:43-47). In that case url_to_id already contains the URL but workers does not, so this branch calls replace() on a missing entry; replace() returns false (model_gateway/src/core/worker_registry.rs:383-387), but the result is ignored and the job appears to succeed without ever inserting the worker into workers, model_index, or routing state. The observable effect is that POST /workers can return a Location that never becomes routable and may stay pending/404 after the job completes.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/core/worker_registry.rs`:
- Around line 395-400: The replacement logic in replace() currently updates
url_to_id when old_worker.url() != new_worker.url() but does not remove the old
URL from model_index or hash_rings and can clobber an existing mapping; either
forbid URL changes in replace() or perform an atomic rename: detect URL mismatch
in replace(), check for a conflicting entry for new_worker.url() in url_to_id
and reject the replace if present, and if allowed, remove old_worker.url() from
url_to_id, model_index and hash_rings before inserting new_worker.url(),
ensuring all indexes are updated consistently (references: replace(), url_to_id,
model_index, hash_rings, old_worker.url(), new_worker.url()).
- Around line 384-393: The replace logic takes a snapshot of old_worker and then
updates multiple indexes and self.workers non-atomically, allowing concurrent
replace() calls for the same worker_id to race; fix by serializing replacements
for the same worker_id so the snapshot-to-index updates are performed under a
single lock/guard: acquire a per-worker (keyed by worker_id) mutex or otherwise
obtain an exclusive guard for the given worker_id at the start of replace(),
then compute old_models via Self::worker_model_ids(&old_worker), compute
new_models, update all related indexes (the model/type/connection maps) and
finally write self.workers.insert(worker_id.clone(), new_worker.clone()) while
still holding the guard, and release the guard at the end so concurrent
replace() calls for the same worker_id cannot interleave and leak stale entries.
- Around line 325-335: The current register() uses contains_key() then insert()
which is a TOCTOU race; change it to perform an atomic insert-check using the
map's entry API (e.g., url_to_id.entry(...)), so you only create/insert a new
WorkerId when the Entry is Vacant and return None immediately if
Entry::Occupied; use the Entry::Vacant(e).insert(worker_id.clone()) path to
store the mapping and avoid the contains_key()+insert() race between concurrent
register() calls.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: c1ebd7f6-7acb-41b1-b913-72709b6f3fda
📒 Files selected for processing (5)
model_gateway/src/core/steps/worker/local/update_worker_properties.rsmodel_gateway/src/core/steps/worker/shared/register.rsmodel_gateway/src/core/worker_registry.rsmodel_gateway/src/routers/http/pd_router.rsmodel_gateway/src/routers/http/router.rs
| pub fn register(&self, worker: Arc<dyn Worker>) -> Option<WorkerId> { | ||
| // Reject if URL already exists | ||
| if self.url_to_id.contains_key(worker.url()) { | ||
| return None; | ||
| } | ||
|
|
||
| if let Some(mut type_workers) = self.type_workers.get_mut(old_worker.worker_type()) { | ||
| type_workers.retain(|id| id != &worker_id); | ||
| } | ||
| let worker_id = WorkerId::new(); | ||
|
|
||
| if let Some(mut conn_workers) = self | ||
| .connection_workers | ||
| .get_mut(old_worker.connection_mode()) | ||
| { | ||
| conn_workers.retain(|id| id != &worker_id); | ||
| } | ||
| } | ||
| // Store URL → ID mapping | ||
| self.url_to_id | ||
| .insert(worker.url().to_string(), worker_id.clone()); |
There was a problem hiding this comment.
Make duplicate-URL registration atomic.
contains_key() followed by insert() is a TOCTOU race. Two concurrent register() calls can both pass Line 327, mint different WorkerIds, and leave duplicate workers/index entries behind even though the URL is supposed to be unique.
Possible fix
- // Reject if URL already exists
- if self.url_to_id.contains_key(worker.url()) {
- return None;
- }
-
let worker_id = WorkerId::new();
-
- // Store URL → ID mapping
- self.url_to_id
- .insert(worker.url().to_string(), worker_id.clone());
+ match self.url_to_id.entry(worker.url().to_string()) {
+ dashmap::mapref::entry::Entry::Occupied(_) => return None,
+ dashmap::mapref::entry::Entry::Vacant(entry) => {
+ entry.insert(worker_id.clone());
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn register(&self, worker: Arc<dyn Worker>) -> Option<WorkerId> { | |
| // Reject if URL already exists | |
| if self.url_to_id.contains_key(worker.url()) { | |
| return None; | |
| } | |
| if let Some(mut type_workers) = self.type_workers.get_mut(old_worker.worker_type()) { | |
| type_workers.retain(|id| id != &worker_id); | |
| } | |
| let worker_id = WorkerId::new(); | |
| if let Some(mut conn_workers) = self | |
| .connection_workers | |
| .get_mut(old_worker.connection_mode()) | |
| { | |
| conn_workers.retain(|id| id != &worker_id); | |
| } | |
| } | |
| // Store URL → ID mapping | |
| self.url_to_id | |
| .insert(worker.url().to_string(), worker_id.clone()); | |
| pub fn register(&self, worker: Arc<dyn Worker>) -> Option<WorkerId> { | |
| let worker_id = WorkerId::new(); | |
| match self.url_to_id.entry(worker.url().to_string()) { | |
| dashmap::mapref::entry::Entry::Occupied(_) => return None, | |
| dashmap::mapref::entry::Entry::Vacant(entry) => { | |
| entry.insert(worker_id.clone()); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/core/worker_registry.rs` around lines 325 - 335, The
current register() uses contains_key() then insert() which is a TOCTOU race;
change it to perform an atomic insert-check using the map's entry API (e.g.,
url_to_id.entry(...)), so you only create/insert a new WorkerId when the Entry
is Vacant and return None immediately if Entry::Occupied; use the
Entry::Vacant(e).insert(worker_id.clone()) path to store the mapping and avoid
the contains_key()+insert() race between concurrent register() calls.
| let old_worker = match self.workers.get(worker_id) { | ||
| Some(entry) => entry.clone(), | ||
| None => return false, | ||
| }; | ||
|
|
||
| let old_models: HashSet<String> = Self::worker_model_ids(&old_worker).into_iter().collect(); | ||
| let new_models: HashSet<String> = Self::worker_model_ids(&new_worker).into_iter().collect(); | ||
|
|
||
| // Overwrite worker object atomically | ||
| self.workers.insert(worker_id.clone(), new_worker.clone()); |
There was a problem hiding this comment.
Serialize same-worker replacements across the full diff.
replace() snapshots old_worker before Line 393 and then mutates three separate indexes from that snapshot. Without any same-URL/ID serialization, two concurrent replaces can both diff from the same old state and leak stale model/type/connection entries. A simple M1 -> M2 race with M1 -> M3 leaves both M2 and M3 indexed while workers only contains the last write.
Also applies to: 402-439
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/core/worker_registry.rs` around lines 384 - 393, The
replace logic takes a snapshot of old_worker and then updates multiple indexes
and self.workers non-atomically, allowing concurrent replace() calls for the
same worker_id to race; fix by serializing replacements for the same worker_id
so the snapshot-to-index updates are performed under a single lock/guard:
acquire a per-worker (keyed by worker_id) mutex or otherwise obtain an exclusive
guard for the given worker_id at the start of replace(), then compute old_models
via Self::worker_model_ids(&old_worker), compute new_models, update all related
indexes (the model/type/connection maps) and finally write
self.workers.insert(worker_id.clone(), new_worker.clone()) while still holding
the guard, and release the guard at the end so concurrent replace() calls for
the same worker_id cannot interleave and leak stale entries.
| // Update URL mapping if URL changed (unlikely but defensive) | ||
| if old_worker.url() != new_worker.url() { | ||
| self.url_to_id.remove(old_worker.url()); | ||
| self.url_to_id | ||
| .insert(new_worker.url().to_string(), worker_id.clone()); | ||
| } |
There was a problem hiding this comment.
Reject URL changes here until rename is handled atomically.
When old_worker.url() != new_worker.url(), the kept-model branch only de-dupes on new_worker.url(), so the old URL stays in model_index/hash_rings. Line 398 can also overwrite another worker’s url_to_id entry if the new URL is already taken. Either forbid URL changes in replace() for now or fully remove old_worker.url() from every old index before inserting the new URL.
Also applies to: 410-413
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/core/worker_registry.rs` around lines 395 - 400, The
replacement logic in replace() currently updates url_to_id when old_worker.url()
!= new_worker.url() but does not remove the old URL from model_index or
hash_rings and can clobber an existing mapping; either forbid URL changes in
replace() or perform an atomic rename: detect URL mismatch in replace(), check
for a conflicting entry for new_worker.url() in url_to_id and reject the replace
if present, and if allowed, remove old_worker.url() from url_to_id, model_index
and hash_rings before inserting new_worker.url(), ensuring all indexes are
updated consistently (references: replace(), url_to_id, model_index, hash_rings,
old_worker.url(), new_worker.url()).
| pub fn register_or_replace(&self, worker: Arc<dyn Worker>) -> WorkerId { | ||
| if let Some(existing_id) = self.url_to_id.get(worker.url()).map(|e| e.clone()) { | ||
| self.replace(&existing_id, worker); | ||
| existing_id | ||
| } else { | ||
| match self.register(worker.clone()) { | ||
| Some(id) => id, | ||
| None => { | ||
| // Race: URL was registered between our check and register(). | ||
| if let Some(existing_id) = self.url_to_id.get(worker.url()).map(|e| e.clone()) { | ||
| self.replace(&existing_id, worker); | ||
| existing_id | ||
| } else { | ||
| // Should never happen — register returned None means URL exists | ||
| tracing::error!( | ||
| "register_or_replace: unexpected state for URL {}", | ||
| worker.url() | ||
| ); | ||
| WorkerId::new() | ||
| } |
There was a problem hiding this comment.
Don’t return a WorkerId unless the upsert actually succeeded.
Both branches ignore the bool from replace(), so an in-flight registration or concurrent removal can make register_or_replace() return an ID while leaving the registry unchanged. The Line 481 fallback is worse: it fabricates a brand-new WorkerId that is not stored anywhere.
Description
Problem
WorkerRegistry::register()silently upserts when the same URL is registered twice (PR #756). This uses a remove-then-add pattern that:Solution
Split
register()into three distinct methods with clear semantics:register()Noneif URL exists.POST /workers(next PR)replace()PUT /workers/{id}(next PR)register_or_replace()The
replace()method eliminates the transient gap by overwriting the worker object first, then diffing old vs new model lists to update the model index. No registration lock needed.This PR refactors the registry internals only — REST API changes (409 on duplicate POST, PUT full replace, PATCH partial update) will follow in a subsequent PR. See plan at
.claude/docs/plans/2026-03-20-worker-api-rest-fix.md.Changes
replace()method with overwrite-then-diff logic toWorkerRegistryregister()create-only (returnsOption<WorkerId>)register_or_replace()for internal idempotent upserturl_registration_locksfield andMuteximportRegisterWorkersStepto useregister_or_replace()UpdateWorkerPropertiesStepto useregister_or_replace()Test Plan
cargo test -p smg --lib core::worker_registry— all 8 tests pass (including 3 new)cargo test -p smg --lib— all 437 tests pass, 0 failuresChecklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspassesSummary by CodeRabbit
Summary by CodeRabbit