Skip to content

Commit d61b0fe

Browse files
authored
feat(pb): actor v1 to v2 migration (#4548)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent 2f3c056 commit d61b0fe

16 files changed

Lines changed: 264 additions & 148 deletions

File tree

engine/artifacts/config-schema.json

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/openapi.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-peer/src/actors/delete.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ use rivet_util::Id;
1919
#[tracing::instrument(skip_all)]
2020
pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result<DeleteResponse> {
2121
// Subscribe before fetching actor data
22-
let mut destroy_sub = ctx
23-
.subscribe::<pegboard::workflows::actor::DestroyComplete>(("actor_id", path.actor_id))
24-
.await?;
22+
let (mut destroy_sub, mut destroy_sub2) = tokio::try_join!(
23+
ctx.subscribe::<pegboard::workflows::actor::DestroyComplete>(("actor_id", path.actor_id)),
24+
ctx.subscribe::<pegboard::workflows::actor2::DestroyComplete>(("actor_id", path.actor_id)),
25+
)?;
2526

2627
let (actors_res, namespace_res) = tokio::try_join!(
2728
// Get the actor to verify it exists
@@ -52,21 +53,33 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
5253
return Err(pegboard::errors::Actor::NotFound.build());
5354
}
5455

55-
// TODO: Actor v2
56+
// Try actor2 first, then fallback to actor
5657
let res = ctx
57-
.signal(pegboard::workflows::actor::Destroy {})
58-
.to_workflow::<pegboard::workflows::actor::Workflow>()
58+
.signal(pegboard::workflows::actor2::Destroy {})
59+
.to_workflow::<pegboard::workflows::actor2::Workflow>()
5960
.tag("actor_id", path.actor_id)
6061
.graceful_not_found()
6162
.send()
6263
.await?;
6364
if res.is_none() {
64-
tracing::warn!(
65-
actor_id=?path.actor_id,
66-
"actor workflow not found, likely already stopped"
67-
);
65+
let res = ctx
66+
.signal(pegboard::workflows::actor::Destroy {})
67+
.to_workflow::<pegboard::workflows::actor::Workflow>()
68+
.tag("actor_id", path.actor_id)
69+
.graceful_not_found()
70+
.send()
71+
.await?;
72+
73+
if res.is_none() {
74+
tracing::warn!(
75+
actor_id=?path.actor_id,
76+
"actor workflow not found, likely already stopped"
77+
);
78+
} else {
79+
destroy_sub.next().await?;
80+
}
6881
} else {
69-
destroy_sub.next().await?;
82+
destroy_sub2.next().await?;
7083
}
7184

7285
Ok(DeleteResponse {})

engine/packages/config/src/config/pegboard.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ pub struct Pegboard {
2020
///
2121
/// Unit is in milliseconds.
2222
pub actor_stop_threshold: Option<i64>,
23+
/// How long to wait after starting to attempt to reallocate before before setting actor to sleep.
24+
///
25+
/// Unit is in milliseconds.
26+
pub actor_retry_duration_threshold: Option<i64>,
2327
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
2428
/// backoff to 0.
2529
///
@@ -163,6 +167,10 @@ impl Pegboard {
163167
self.actor_stop_threshold.unwrap_or(30_000)
164168
}
165169

170+
pub fn actor_retry_duration_threshold(&self) -> i64 {
171+
self.actor_retry_duration_threshold.unwrap_or(300_000)
172+
}
173+
166174
pub fn retry_reset_duration(&self) -> i64 {
167175
self.retry_reset_duration.unwrap_or(10 * 60 * 1000)
168176
}

engine/packages/guard/src/routing/pegboard_gateway.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ async fn route_request_inner(
189189
stopped_sub,
190190
fail_sub,
191191
destroy_sub,
192+
migrate_sub,
192193
ready_sub2,
193194
stopped_sub2,
194195
fail_sub2,
@@ -198,6 +199,7 @@ async fn route_request_inner(
198199
ctx.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", actor_id)),
199200
ctx.subscribe::<pegboard::workflows::actor::Failed>(("actor_id", actor_id)),
200201
ctx.subscribe::<pegboard::workflows::actor::DestroyStarted>(("actor_id", actor_id)),
202+
ctx.subscribe::<pegboard::workflows::actor::MigratedToV2>(("actor_id", actor_id)),
201203
ctx.subscribe::<pegboard::workflows::actor2::Ready>(("actor_id", actor_id)),
202204
ctx.subscribe::<pegboard::workflows::actor2::Stopped>(("actor_id", actor_id)),
203205
ctx.subscribe::<pegboard::workflows::actor2::Failed>(("actor_id", actor_id)),
@@ -218,6 +220,12 @@ async fn route_request_inner(
218220

219221
match actor.version {
220222
2 => {
223+
drop(ready_sub);
224+
drop(stopped_sub);
225+
drop(fail_sub);
226+
drop(destroy_sub);
227+
drop(migrate_sub);
228+
221229
handle_actor_v2(
222230
ctx,
223231
shared_state,
@@ -242,6 +250,11 @@ async fn route_request_inner(
242250
stopped_sub,
243251
fail_sub,
244252
destroy_sub,
253+
migrate_sub,
254+
ready_sub2,
255+
stopped_sub2,
256+
fail_sub2,
257+
destroy_sub2,
245258
)
246259
.await
247260
}
@@ -359,6 +372,11 @@ async fn handle_actor_v1(
359372
mut stopped_sub: SubscriptionHandle<pegboard::workflows::actor::Stopped>,
360373
mut fail_sub: SubscriptionHandle<pegboard::workflows::actor::Failed>,
361374
mut destroy_sub: SubscriptionHandle<pegboard::workflows::actor::DestroyStarted>,
375+
mut migrate_sub: SubscriptionHandle<pegboard::workflows::actor::MigratedToV2>,
376+
ready_sub2: SubscriptionHandle<pegboard::workflows::actor2::Ready>,
377+
stopped_sub2: SubscriptionHandle<pegboard::workflows::actor2::Stopped>,
378+
fail_sub2: SubscriptionHandle<pegboard::workflows::actor2::Failed>,
379+
destroy_sub2: SubscriptionHandle<pegboard::workflows::actor2::DestroyStarted>,
362380
) -> Result<RoutingOutput> {
363381
// Wake actor if sleeping
364382
if actor.sleeping {
@@ -382,11 +400,9 @@ async fn handle_actor_v1(
382400
let mut wake_retries = 0;
383401

384402
// Create pool error check future
385-
let pool_error_check_fut = check_runner_pool_error_loop(
386-
ctx,
387-
actor.namespace_id,
388-
actor.runner_name_selector.as_deref(),
389-
);
403+
let runner_name_selector = actor.runner_name_selector.clone();
404+
let pool_error_check_fut =
405+
check_runner_pool_error_loop(ctx, actor.namespace_id, runner_name_selector.as_deref());
390406
tokio::pin!(pool_error_check_fut);
391407

392408
// Wait for ready, fail, or destroy
@@ -430,6 +446,20 @@ async fn handle_actor_v1(
430446
res?;
431447
return Err(pegboard::errors::Actor::DestroyedWhileWaitingForReady.build());
432448
}
449+
res = migrate_sub.next() => {
450+
res?;
451+
return handle_actor_v2(
452+
ctx,
453+
shared_state,
454+
actor_id,
455+
actor,
456+
stripped_path,
457+
ready_sub2,
458+
stopped_sub2,
459+
fail_sub2,
460+
destroy_sub2,
461+
).await;
462+
}
433463
res = &mut pool_error_check_fut => {
434464
if res? {
435465
return Err(errors::ActorRunnerFailed { actor_id }.build());

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
235235
input: input.input.clone(),
236236
from_v1: true,
237237
})
238+
.tag("actor_id", input.actor_id)
238239
.dispatch()
239240
.await?;
240241
return Ok(());
@@ -845,15 +846,36 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
845846
.send()
846847
.await?;
847848

848-
ctx.workflow(destroy::Input {
849-
namespace_id: input.namespace_id,
850-
actor_id: input.actor_id,
851-
name: input.name.clone(),
852-
key: input.key.clone(),
853-
generation: lifecycle_res.generation,
854-
})
855-
.output()
856-
.await?;
849+
if lifecycle_res.migrate_to_v2 {
850+
ctx.workflow(crate::workflows::actor2::Input {
851+
actor_id: input.actor_id,
852+
name: input.name.clone(),
853+
pool_name: input.runner_name_selector.clone(),
854+
key: input.key.clone(),
855+
namespace_id: input.namespace_id,
856+
crash_policy: input.crash_policy,
857+
input: input.input.clone(),
858+
from_v1: true,
859+
})
860+
.tag("actor_id", input.actor_id)
861+
.dispatch()
862+
.await?;
863+
864+
ctx.msg(MigratedToV2 {})
865+
.topic(("actor_id", input.actor_id))
866+
.send()
867+
.await?;
868+
} else {
869+
ctx.workflow(destroy::Input {
870+
namespace_id: input.namespace_id,
871+
actor_id: input.actor_id,
872+
name: input.name.clone(),
873+
key: input.key.clone(),
874+
generation: lifecycle_res.generation,
875+
})
876+
.output()
877+
.await?;
878+
}
857879

858880
Ok(())
859881
}
@@ -1382,6 +1404,9 @@ pub struct GoingAway {
13821404
pub reset_rescheduling: bool,
13831405
}
13841406

1407+
#[message("pegboard_actor_migrated_to_v2")]
1408+
pub struct MigratedToV2 {}
1409+
13851410
#[signal("pegboard_actor_destroy")]
13861411
pub struct Destroy {}
13871412

0 commit comments

Comments
 (0)