Skip to content

ZettaAI/kombu-worker

Repository files navigation

kombu-worker

Code style: black

An interface to AMQP-based message queues (e.g., some RabbitMQ systems) using kombu. This repo mostly combines code from seuron and python-task-queue. It is intended to help consolidate connectomics processing steps into seuron to create a more streamlined system.

Installation

pip install git+https://github.com/ZettaAI/kombu-worker

Usage

There are two intended interfaces to use these tools.

taskqueueworker

The taskqueueworker interface abstracts most details of working with the queue, and replicates python-task-queue functions.

from functools import partial

from kombuworker import taskqueueworker as tqw
from taskqueue import queueable


@queueable
def queueable_task(*args, **kwargs):
    pass

tasks = [partial(queueable_task, *args, **kwargs)]

# Generation/Manager
tqw.insert_tasks(queueurl, queuename, tasks)

# Worker node
tqw.poll(queueurl, queuename)

Note that, different from other systems like SQS, AMQP-based queues are specified by two fields: a URL to the queue host (or "exchange"), and a name to identify a specific queue within that exchange. Also note that this polling function has a few parameters for handling queues that are currently empty. Set max_num_retries to None if you'd like the workers to persist indefinitely.

queuetools

A user can also work more directly with the raw messages within the AMQP queue using this interface. The taskqueueworker functions wrap around these functions, and serve as easy guides for how to handle the queuetools functions. For example, see taskqueueworker.fetch_tasks for a nice way to use the queuetools.fetch_msgs generator.

When using fetch_msgs, set max_num_retries to None if you'd like the workers to persist indefinitely, but make sure to set up a way to stop the process, otherwise it won't give you control back. taskqueueworker.poll and other interfaces handle this for you.

About

A simple wrapper around some kombu functionality

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors