Skip to content

Commit db0d3c9

Browse files
authored
Do not terminate fleet instances on idle_duration at nodes.min (#3235)
* Do not terminate fleet instances on idle_duration at nodes.min * Update idle_duration reference * Skip nodes.min check for autocreated fleets
1 parent 8951afb commit db0d3c9

File tree

3 files changed

+43
-4
lines changed

3 files changed

+43
-4
lines changed

src/dstack/_internal/core/models/fleets.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,12 @@ class InstanceGroupParams(CoreModel):
309309
idle_duration: Annotated[
310310
Optional[int],
311311
Field(
312-
description="Time to wait before terminating idle instances. Defaults to `5m` for runs and `3d` for fleets. Use `off` for unlimited duration"
312+
description=(
313+
"Time to wait before terminating idle instances."
314+
" Instances are not terminated if the fleet is already at `nodes.min`."
315+
" Defaults to `5m` for runs and `3d` for fleets."
316+
" Use `off` for unlimited duration"
317+
)
313318
),
314319
] = None
315320

src/dstack/_internal/core/models/profiles.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,9 @@ class ProfileParams(CoreModel):
341341
Field(
342342
description=(
343343
"Time to wait before terminating idle instances."
344-
" Defaults to `5m` for runs and `3d` for fleets. Use `off` for unlimited duration"
344+
" Instances are not terminated if the fleet is already at `nodes.min`."
345+
" Defaults to `5m` for runs and `3d` for fleets."
346+
" Use `off` for unlimited duration"
345347
)
346348
),
347349
] = None

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,12 @@ async def _process_next_instance():
196196

197197

198198
async def _process_instance(session: AsyncSession, instance: InstanceModel):
199+
# Refetch to load related attributes.
200+
# Load related attributes only for statuses that always need them.
199201
if instance.status in (
200202
InstanceStatus.PENDING,
201203
InstanceStatus.TERMINATING,
202204
):
203-
# Refetch to load related attributes.
204-
# Load related attributes only for statuses that always need them.
205205
res = await session.execute(
206206
select(InstanceModel)
207207
.where(InstanceModel.id == instance.id)
@@ -211,6 +211,16 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel):
211211
.execution_options(populate_existing=True)
212212
)
213213
instance = res.unique().scalar_one()
214+
elif instance.status == InstanceStatus.IDLE:
215+
res = await session.execute(
216+
select(InstanceModel)
217+
.where(InstanceModel.id == instance.id)
218+
.options(joinedload(InstanceModel.project))
219+
.options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status))
220+
.options(joinedload(InstanceModel.fleet).joinedload(FleetModel.instances))
221+
.execution_options(populate_existing=True)
222+
)
223+
instance = res.unique().scalar_one()
214224

215225
if instance.status == InstanceStatus.PENDING:
216226
if instance.remote_connection_info is not None:
@@ -242,6 +252,14 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
242252
and not instance.jobs
243253
):
244254
return False
255+
if instance.fleet is not None and not _can_terminate_fleet_instances_on_idle_duration(
256+
instance.fleet
257+
):
258+
logger.debug(
259+
"Skipping instance %s termination on idle duration. Fleet is already at `nodes.min`.",
260+
instance.name,
261+
)
262+
return False
245263
idle_duration = _get_instance_idle_duration(instance)
246264
idle_seconds = instance.termination_idle_time
247265
delta = datetime.timedelta(seconds=idle_seconds)
@@ -261,6 +279,20 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
261279
return False
262280

263281

282+
def _can_terminate_fleet_instances_on_idle_duration(fleet_model: FleetModel) -> bool:
283+
# Do not terminate instances on idle duration if fleet is already at `nodes.min`.
284+
# This is an optimization to avoid terminate-create loop.
285+
# There may be race conditions since we don't take the fleet lock.
286+
# That's ok: in the worst case we go below `nodes.min`, but
287+
# the fleet consolidation logic will provision new nodes.
288+
fleet = fleet_model_to_fleet(fleet_model)
289+
if fleet.spec.configuration.nodes is None or fleet.spec.autocreated:
290+
return True
291+
active_instances = [i for i in fleet_model.instances if i.status.is_active()]
292+
active_instances_num = len(active_instances)
293+
return active_instances_num > fleet.spec.configuration.nodes.min
294+
295+
264296
async def _add_remote(instance: InstanceModel) -> None:
265297
logger.info("Adding ssh instance %s...", instance.name)
266298
if instance.status == InstanceStatus.PENDING:

0 commit comments

Comments
 (0)