Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions scheduler/helpers/timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,11 @@ def cancel_death_penalty(self):
return
self._timer.cancel()
self._timer = None


def get_default_death_penalty_class() -> type[BaseDeathPenalty]:
"""Returns the default death penalty class based on the platform."""
if hasattr(signal, "SIGALRM"):
return UnixSignalDeathPenalty
else:
return TimerDeathPenalty
4 changes: 2 additions & 2 deletions scheduler/types/settings_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from enum import Enum
from typing import Callable, Dict, Optional, List, Tuple, Any, Type, ClassVar, Set

from scheduler.helpers.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty
from scheduler.helpers.timeouts import BaseDeathPenalty, get_default_death_penalty_class

if sys.version_info >= (3, 11):
from typing import Self
Expand Down Expand Up @@ -40,7 +40,7 @@ class SchedulerConfiguration:
DEFAULT_MAINTENANCE_TASK_INTERVAL: int = 10 * 60 # The interval to run maintenance tasks in seconds. 10 minutes.
DEFAULT_JOB_MONITORING_INTERVAL: int = 30 # The interval to monitor jobs in seconds.
SCHEDULER_FALLBACK_PERIOD_SECS: int = 120 # Period (secs) to wait before requiring to reacquire locks
DEATH_PENALTY_CLASS: Type[BaseDeathPenalty] = UnixSignalDeathPenalty
DEATH_PENALTY_CLASS: Type[BaseDeathPenalty] = get_default_death_penalty_class()


@dataclass(slots=True, frozen=True, kw_only=True)
Expand Down
4 changes: 2 additions & 2 deletions scheduler/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ def execute_job(self, job: JobModel, queue: Queue) -> None:
The worker will wait for the job execution process and make sure it executes within the given timeout bounds, or
will end the job execution process with SIGALRM.
"""
if self.fork_job_execution:
if hasattr(os, "fork") and self.fork_job_execution:
self._model.set_field("state", WorkerStatus.BUSY, connection=self.connection)
self.fork_job_execution_process(job, queue)
self.monitor_job_execution_process(job, queue)
Expand Down Expand Up @@ -839,7 +839,7 @@ def _ensure_list(obj: Any) -> List[Any]:


def _calc_worker_name(existing_worker_names: Collection[str]) -> str:
hostname = os.uname()[1]
hostname = socket.gethostname()
c = 1
worker_name = f"{hostname}-worker.{c}"
while worker_name in existing_worker_names:
Expand Down