From 1689c13c4b273a36d079c3db407e502c0c65e6f9 Mon Sep 17 00:00:00 2001 From: Jose Alberto Date: Tue, 22 Apr 2014 16:57:01 +0100 Subject: [PATCH 1/4] add bunny_queue class --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index ef34099..d971331 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ spec/rails/tmp *.pid Gemfile.lock .rvmrc +*.swp From e0f8baf369026b04255f6fe0d65944f2c9cc627d Mon Sep 17 00:00:00 2001 From: Jose Alberto Date: Tue, 22 Apr 2014 18:13:09 +0100 Subject: [PATCH 2/4] fix autoload of new class --- .gitignore | 1 + lib/loops/autoload.rb | 1 + 2 files changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index d971331..575b581 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ spec/rails/tmp Gemfile.lock .rvmrc *.swp +tags diff --git a/lib/loops/autoload.rb b/lib/loops/autoload.rb index 9f7f263..71b4e75 100644 --- a/lib/loops/autoload.rb +++ b/lib/loops/autoload.rb @@ -12,6 +12,7 @@ def self.__p(*path) File.join(Loops::LIB_ROOT, 'loops', *path) end autoload :Logger, __p('logger') autoload :ProcessManager, __p('process_manager') autoload :Queue, __p('queue') + autoload :BunnyQueue, __p('bunny_queue') autoload :Worker, __p('worker') autoload :WorkerPool, __p('worker_pool') From f95801db285630440305301d91d1e22df34d46b8 Mon Sep 17 00:00:00 2001 From: Jose Alberto Date: Tue, 22 Apr 2014 18:14:52 +0100 Subject: [PATCH 3/4] add bunny_queue class --- lib/loops/bunny_queue.rb | 69 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 lib/loops/bunny_queue.rb diff --git a/lib/loops/bunny_queue.rb b/lib/loops/bunny_queue.rb new file mode 100644 index 0000000..e8b41b8 --- /dev/null +++ b/lib/loops/bunny_queue.rb @@ -0,0 +1,69 @@ +begin + require 'bunny' +rescue LoadError + puts "Can't load bunny gem - all queue loops will be disabled!" +end + +require 'timeout' + +module Loops + class BunnyQueue < Base + def self.check_dependencies + raise "No bunny gem installed!" unless defined?(Bunny) + end + + def run + create_client + + config['queue_name'] ||= "/queue/loops/#{name}" + config['ack'] ||= false + debug "Client created" if @client + debug "Subscribing for the queue #{config['queue_name']}..." + + @total_served = 0 + @channel ||= @client.create_channel + debug "Channel created" if @channel + @queue ||= @channel.queue(config['queue_name']) + debug "Queue created" if @queue + + @queue.subscribe(:ack => config['ack'], :block => true) do |delivery_info, properties, payload| + begin + process_message(delivery_info, properties, payload) + + @channel.ack(delivery_info.delivery_tag, false) if config['ack'] + @total_served += 1 + if config['max_requests'] && @total_served >= config['max_requests'].to_i + disconnect_client_and_exit + end + rescue => e + error "Exception from process message! We won't be ACKing the message." + error "Details: #{e} at #{e.backtrace.first}" + disconnect_client_and_exit + end + end + rescue => e + error "Closing queue connection because of exception: #{e} at #{e.backtrace.first}" + disconnect_client_and_exit + end + + def process_message(delivery_info, properties, payload) + raise "This method process_message(msg) should be overriden in the loop class!" + end + + private + + def create_client + debug "Create Bunny client..." + config['uri'] ||= 'amqp://127.0.0.1:5672' + + @client = Bunny.new(config['uri']) + @client.start + end + + def disconnect_client_and_exit + debug "Unsubscribing..." + @client.close + exit(0) + end + end +end From ec0f19ee57f72be0579e775f350d85f1dba52310 Mon Sep 17 00:00:00 2001 From: Jose Alberto Date: Wed, 23 Apr 2014 10:58:04 +0100 Subject: [PATCH 4/4] fix logger bug --- lib/loops/logger.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/loops/logger.rb b/lib/loops/logger.rb index b124d90..7c36dc9 100644 --- a/lib/loops/logger.rb +++ b/lib/loops/logger.rb @@ -68,7 +68,7 @@ def logfile=(logfile) when 'stderr' then $stderr when IO, StringIO then logfile else - if Loops.root + if Loops.root && logfile logfile =~ /^\// ? logfile : Loops.root.join(logfile).to_s else logfile