Skip to content

Commit e0e703e

Browse files
Add 12 minute timeout to overall kafka process
1 parent ebde423 commit e0e703e

File tree

1 file changed

+33
-3
lines changed

1 file changed

+33
-3
lines changed

src/launchpad/kafka.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,13 @@ def maybe_create_pool(self) -> None:
5656

5757

5858
class LaunchpadRunTaskWithMultiprocessing(RunTaskWithMultiprocessing[TStrategyPayload, Any]):
59-
"""Tolerates child process exits from maxtasksperchild=1 by ignoring SIGCHLD."""
59+
"""
60+
Tolerates child process exits from maxtasksperchild=1 by ignoring SIGCHLD.
61+
62+
This class extends RunTaskWithMultiprocessing with:
63+
- maxtasksperchild=1 to ensure clean worker state (via LaunchpadMultiProcessingPool)
64+
- Custom SIGCHLD handler that doesn't raise errors on expected worker exits
65+
"""
6066

6167
def __init__(
6268
self,
@@ -76,15 +82,39 @@ def __init__(
7682
)
7783

7884

85+
def timeout_handler(signum: int, frame: Any) -> None:
86+
"""Signal handler for SIGALRM to enforce task timeout and exit the process immediately."""
87+
os._exit(1)
88+
89+
7990
def process_kafka_message_with_service(msg: Message[KafkaPayload]) -> Any:
80-
"""Process a Kafka message using the actual service logic in a worker process."""
91+
"""
92+
Process a Kafka message using the actual service logic in a worker process.
93+
94+
This function implements a 12-minute timeout mechanism using SIGALRM (Unix only).
95+
If a task exceeds 12 minutes, it will be terminated via TaskTimeoutError.
96+
The worker process will then exit (due to maxtasksperchild=1), and a new
97+
process will be spawned for the next task.
98+
"""
99+
start_time = time.time()
100+
101+
if hasattr(signal, "SIGALRM"):
102+
signal.signal(signal.SIGALRM, timeout_handler)
103+
signal.alarm(720) # 12 minute total process timeout
104+
81105
try:
82106
decoded = PREPROD_ARTIFACT_SCHEMA.decode(msg.payload.value)
107+
logger.info(f"Starting to process artifact: {decoded.get('artifact_id', 'unknown')}")
83108
ArtifactProcessor.process_message(decoded)
84109
return decoded # type: ignore[no-any-return]
85110
except Exception as e:
86-
logger.error(f"Failed to process message in worker: {e}", exc_info=True)
111+
elapsed_time = time.time() - start_time
112+
logger.error(f"Failed to process message in worker after {elapsed_time:.2f} seconds: {e}", exc_info=True)
87113
raise
114+
finally:
115+
# Cancel the timeout alarm as it's an expected exit
116+
if hasattr(signal, "SIGALRM"):
117+
signal.alarm(0)
88118

89119

90120
def create_kafka_consumer() -> LaunchpadKafkaConsumer:

0 commit comments

Comments
 (0)