Skip to content

Commit 63905b9

Browse files
committed
add not acknowledge to worker
1 parent ea02240 commit 63905b9

File tree

6 files changed

+53
-28
lines changed

6 files changed

+53
-28
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ build
88
dist
99
docs/_build
1010
.pypirc
11+
message_queue.egg-info

examples/publisher.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,20 @@
44

55
if __name__ == '__main__':
66
# Instantiate the AMQP adapter with the host configuration
7-
adapter = message_queue.AMQPAdapter(host='192.168.99.100')
7+
adapter = message_queue.AMQPAdapter(host='127.0.0.1')
88

99
# Configurate queue
1010
adapter.configurate_queue(queue='python.publish.test')
1111

1212
# Instantiate publisher
1313
publisher = message_queue.Publisher(adapter)
1414

15-
# Create a new message
16-
message = message_queue.Message({
17-
'id': 12345,
18-
'message': 'test publish'
19-
})
20-
2115
# Publish message
2216
for i in xrange(10000):
17+
message = message_queue.Message({
18+
'id': i,
19+
'message': 'test publish'
20+
})
21+
2322
publisher.publish(message)
2423

examples/subscriber.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,20 @@
44

55

66
def my_worker(channel, method, properties, body):
7-
print json.loads(body)
7+
try:
8+
data = json.loads(body)
9+
print data
10+
11+
except:
12+
print 'Error parsing body: %r' % body
13+
# return False to not acknowledge the message
14+
return False
15+
816

917
if __name__ == '__main__':
1018
# Instantiate the AMQP adapter with the host configuration
11-
adapter = message_queue.AMQPAdapter(host='192.168.99.100')
19+
adapter = message_queue.AMQPAdapter(host='127.0.0.1')
20+
1221
# Configurate queue
1322
adapter.configurate_queue(queue='python.publish.test')
1423

message_queue/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = '0.1.1'
1+
__version__ = '0.1.6'
22

33
from message_queue.logger import *
44

message_queue/adapters/amqp_adapter.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def configurate_queue(self, **kwargs):
4646
if not self.queue:
4747
self.queue = kwargs.get('queue', '')
4848
self.basic_ack = kwargs.get('basic_ack', True)
49-
self.prefetch_count = kwargs.get('prefetch_count', 0)
49+
self.prefetch_count = kwargs.get('prefetch_count', 1)
5050

5151
self.channel.queue_declare(
5252
queue = self.queue,
@@ -64,7 +64,7 @@ def configurate_queue(self, **kwargs):
6464
self.queue, self.basic_ack, self.prefetch_count)
6565

6666
def connect(self):
67-
"""Connect usgin BlockingConnection.
67+
"""Connect to AMQP server usgin BlockingConnection.
6868
6969
"""
7070
try:
@@ -108,7 +108,7 @@ def format_message(self, message):
108108
_message = {}
109109
_message['body'] = message['body']
110110
_message['routing_key'] = self.queue
111-
_message['exchange'] = exchange
111+
_message['exchange'] = exchange
112112
_message['properties'] = pika.BasicProperties(
113113
content_type='application/json',
114114
delivery_mode=delivery_mode,
@@ -124,8 +124,8 @@ def consume(self, worker):
124124
:param function worker: Method that consume the message
125125
126126
"""
127-
self._consume_worker = worker
128-
self.channel.basic_consume(self.consume_callback, self.queue)
127+
callback = self.consume_callback(worker)
128+
self.channel.basic_consume(callback, self.queue)
129129

130130
try:
131131
self.channel.start_consuming()
@@ -134,24 +134,40 @@ def consume(self, worker):
134134
self.channel.stop_consuming()
135135
self.close()
136136

137-
def consume_callback(self, channel, method, properties, body):
138-
"""Message consume callback
137+
def consume_callback(self, worker):
138+
"""Decorate worker to exectue on consume callback.
139139
140-
:param pika.channel.Channel channel: The channel object
141-
:param pika.Spec.Basic.Deliver method: basic_deliver method
142-
:param pika.Spec.BasicProperties properties: properties
143-
:param str|unicode body: The message body
140+
:param function worker: Worker to execture in the consume callback
144141
145142
"""
146-
self._consume_worker(channel, method, properties, body)
147-
self.consume_acknowledge(channel, method.delivery_tag)
143+
def callback(channel, method, properties, body):
144+
"""Message consume callback.
148145
149-
def consume_acknowledge(self, channel, tag):
150-
"""Message acknowledge
146+
:param pika.channel.Channel channel: The channel object
147+
:param pika.Spec.Basic.Deliver method: basic_deliver method
148+
:param pika.Spec.BasicProperties properties: properties
149+
:param str|unicode body: The message body
150+
151+
"""
152+
# Execute the worker
153+
acknowledge = worker(channel, method, properties, body)
154+
155+
# Acknowledge the message or not
156+
self._consume_acknowledge(channel, method.delivery_tag, acknowledge)
157+
158+
return callback
159+
160+
def _consume_acknowledge(self, channel, tag, acknowledge=True):
161+
"""Message acknowledge.
151162
152163
:param pika.channel.Channel channel: Channel to acknowledge the message
153164
:param int tag: Message tag to acknowledge
165+
:param bool acknowledge: If should acknowledge the message or not
154166
155167
"""
168+
if acknowledge is False:
169+
channel.basic_nack(delivery_tag=tag)
170+
return
171+
156172
channel.basic_ack(delivery_tag=tag)
157173

setup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22

33
setuptools.setup(
44
name='message-queue',
5-
version='0.1.1',
5+
version='0.1.6',
66
description='Message Queue',
77
long_description='Message Queue python library to publish and subscribe to queues with diferent types of adapters.',
88
url='https://github.com/ingresse/message-queue-python',
99
author='Ingresse',
1010
author_email='vitor.leal@ingresse.com',
1111
license='BSD',
1212
packages=setuptools.find_packages(),
13-
install_requires=['pika'],
14-
download_url = 'https://github.com/ingresse/message-queue-python/tarball/0.1.1',
13+
install_requires=['pika>=0.10.0'],
14+
download_url = 'https://github.com/ingresse/message-queue-python/tarball/0.1.6',
1515
)
1616

0 commit comments

Comments
 (0)