Skip to content

Commit a3edc3a

Browse files
committed
draft of single message rmq consumer
1 parent 68939f1 commit a3edc3a

File tree

1 file changed

+175
-1
lines changed

1 file changed

+175
-1
lines changed

integrations/rmq.py

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import pika
44
import config
55
import traceback
6-
from typing import List
6+
import sys
7+
import os
8+
import signal
9+
from typing import List, Optional
710
from concurrent.futures import ThreadPoolExecutor
811
from loguru import logger
912
from common.config_parser import parse_app_properties
@@ -154,6 +157,177 @@ def __del__(self):
154157
self.close()
155158

156159

160+
class SingleMessageConsumer:
161+
"""
162+
One-shot worker aligned with RMQConsumer style:
163+
- Connects using pika.BlockingConnection
164+
- Pulls exactly ONE message via basic_get(auto_ack=False)
165+
- Applies message_converter.convert(...)
166+
- Runs each handler.handle(...)
167+
- Optionally publishes to reply_to
168+
- ACKs on success; REJECTs on failure (requeue=False to match RMQConsumer)
169+
- Closes and exits
170+
171+
Intended for KEDA ScaledJob: one job = one message.
172+
"""
173+
174+
def __init__(self,
175+
host: str = RMQ_SERVER,
176+
port: int = RMQ_PORT,
177+
vhost: str = RMQ_VHOST,
178+
queue: str = RMQ_QUEUE,
179+
username: str = RMQ_USERNAME,
180+
password: str = RMQ_PASSWORD,
181+
reply_to: Optional[str] = None,
182+
message_handlers: Optional[List[object]] = None,
183+
message_converter: Optional[object] = None,
184+
heartbeat: int = 0, # safer for long jobs
185+
socket_timeout: Optional[float] = None,
186+
blocked_connection_timeout: float = 600.0,
187+
connection_attempts: int = 5,
188+
retry_delay: int = 3,
189+
log_body: bool = False,
190+
):
191+
self._host = host
192+
self._port = int(port)
193+
self._vhost = vhost
194+
self._queue = queue
195+
self._username = username
196+
self._password = password
197+
198+
self.reply_to = reply_to
199+
self.message_handlers = message_handlers or []
200+
self.message_converter = message_converter
201+
self.log_body = log_body
202+
203+
self._heartbeat = heartbeat
204+
self._socket_timeout = socket_timeout
205+
self._blocked_connection_timeout = blocked_connection_timeout
206+
self._connection_attempts = connection_attempts
207+
self._retry_delay = retry_delay
208+
209+
self._connection = None
210+
self._channel = None
211+
self._in_shutdown = False
212+
213+
# Graceful shutdown: finish current message, then exit
214+
signal.signal(signal.SIGTERM, self._on_term_signal)
215+
signal.signal(signal.SIGINT, self._on_term_signal)
216+
217+
def connect(self):
218+
params = pika.ConnectionParameters(
219+
host=self._host,
220+
port=self._port,
221+
virtual_host=self._vhost,
222+
credentials=pika.PlainCredentials(self._username, self._password),
223+
heartbeat=self._heartbeat,
224+
blocked_connection_timeout=self._blocked_connection_timeout,
225+
connection_attempts=self._connection_attempts,
226+
retry_delay=self._retry_delay,
227+
socket_timeout=self._socket_timeout,
228+
)
229+
logger.info(f"Connecting to RabbitMQ at {self._host}:{self._port} vhost='{self._vhost}'")
230+
self._connection = pika.BlockingConnection(params)
231+
self._channel = self._connection.channel()
232+
logger.info("Connection established and channel opened")
233+
234+
def close(self):
235+
try:
236+
if self._channel and self._channel.is_open:
237+
self._channel.close()
238+
except Exception as e:
239+
logger.warning(f"Error closing channel: {e}")
240+
try:
241+
if self._connection and self._connection.is_open:
242+
self._connection.close()
243+
except Exception as e:
244+
logger.warning(f"Error closing connection: {e}")
245+
246+
def _on_term_signal(self, signum, _frame):
247+
# Do not abort mid-processing; just mark and finish
248+
self._in_shutdown = True
249+
logger.warning(f"Received signal {signum}; will exit after current message finishes.")
250+
251+
def _basic_get(self):
252+
assert self._channel is not None
253+
return self._channel.basic_get(self._queue, auto_ack=False)
254+
255+
def _ack(self, delivery_tag: int):
256+
assert self._channel is not None
257+
self._channel.basic_ack(delivery_tag)
258+
259+
def _reject_drop(self, delivery_tag: int):
260+
# Mirror RMQConsumer failure behavior: reject with requeue=False
261+
assert self._channel is not None
262+
self._channel.basic_reject(delivery_tag, requeue=False)
263+
264+
def run_once(self) -> int:
265+
"""
266+
Exit codes:
267+
0 -> processed OK or queue empty
268+
2 -> conversion/handler failed (rejected)
269+
3 -> connection/setup error
270+
"""
271+
try:
272+
self.connect()
273+
except Exception as e:
274+
logger.error(f"Failed to connect to RabbitMQ: {e}")
275+
return 3
276+
277+
try:
278+
method, properties, body = self._basic_get()
279+
280+
if not method:
281+
logger.info(f"No message available in queue '{self._queue}', exiting")
282+
return 0
283+
284+
delivery_tag = method.delivery_tag
285+
app_id = getattr(properties, "app_id", None)
286+
headers = getattr(properties, "headers", None)
287+
288+
logger.info(f"Received message #{delivery_tag} from {app_id} meta: {headers}")
289+
if self.log_body:
290+
logger.debug(f"Message body: {body!r}")
291+
292+
if self.message_converter:
293+
try:
294+
body, content_type = self.message_converter.convert(body)
295+
if properties is None:
296+
properties = pika.BasicProperties(content_type=content_type)
297+
else:
298+
properties.content_type = content_type
299+
logger.info("Message converted")
300+
except Exception as error:
301+
logger.error(f"Message conversion failed: {error}\n{traceback.format_exc()}")
302+
self._reject_drop(delivery_tag)
303+
return 2
304+
305+
for handler in self.message_handlers:
306+
try:
307+
logger.info(f"Handling message with handler: {handler.__class__.__name__}")
308+
body, properties = handler.handle(body, properties=properties, channel=self._channel)
309+
except Exception as error:
310+
logger.error(f"Message handling failed: {error}\n{traceback.format_exc()}")
311+
logger.exception("Message handling failed, see traceback in document")
312+
self._reject_drop(delivery_tag)
313+
return 2
314+
315+
if self.reply_to:
316+
logger.info(f"Publishing message to exchange/queue: {self.reply_to}")
317+
self._channel.basic_publish(exchange="", routing_key=self.reply_to, body=body, properties=properties)
318+
319+
# Success
320+
self._ack(delivery_tag)
321+
logger.info(f"ACKed message #{delivery_tag}")
322+
323+
if self._in_shutdown:
324+
logger.info("Shutdown requested; exiting cleanly after finishing message")
325+
return 0
326+
327+
finally:
328+
self.close()
329+
330+
157331
class RMQConsumer:
158332
"""This is an example consumer that will handle unexpected interactions
159333
with RabbitMQ such as channel and connection closures.

0 commit comments

Comments
 (0)