diff --git a/.gitignore b/.gitignore index ef34099..575b581 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ spec/rails/tmp *.pid Gemfile.lock .rvmrc +*.swp +tags diff --git a/lib/loops/autoload.rb b/lib/loops/autoload.rb index 9f7f263..71b4e75 100644 --- a/lib/loops/autoload.rb +++ b/lib/loops/autoload.rb @@ -12,6 +12,7 @@ def self.__p(*path) File.join(Loops::LIB_ROOT, 'loops', *path) end autoload :Logger, __p('logger') autoload :ProcessManager, __p('process_manager') autoload :Queue, __p('queue') + autoload :BunnyQueue, __p('bunny_queue') autoload :Worker, __p('worker') autoload :WorkerPool, __p('worker_pool') diff --git a/lib/loops/bunny_queue.rb b/lib/loops/bunny_queue.rb new file mode 100644 index 0000000..e8b41b8 --- /dev/null +++ b/lib/loops/bunny_queue.rb @@ -0,0 +1,69 @@ +begin + require 'bunny' +rescue LoadError + puts "Can't load bunny gem - all queue loops will be disabled!" +end + +require 'timeout' + +module Loops + class BunnyQueue < Base + def self.check_dependencies + raise "No bunny gem installed!" unless defined?(Bunny) + end + + def run + create_client + + config['queue_name'] ||= "/queue/loops/#{name}" + config['ack'] ||= false + debug "Client created" if @client + debug "Subscribing for the queue #{config['queue_name']}..." + + @total_served = 0 + @channel ||= @client.create_channel + debug "Channel created" if @channel + @queue ||= @channel.queue(config['queue_name']) + debug "Queue created" if @queue + + @queue.subscribe(:ack => config['ack'], :block => true) do |delivery_info, properties, payload| + begin + process_message(delivery_info, properties, payload) + + @channel.ack(delivery_info.delivery_tag, false) if config['ack'] + @total_served += 1 + if config['max_requests'] && @total_served >= config['max_requests'].to_i + disconnect_client_and_exit + end + rescue => e + error "Exception from process message! We won't be ACKing the message." + error "Details: #{e} at #{e.backtrace.first}" + disconnect_client_and_exit + end + end + rescue => e + error "Closing queue connection because of exception: #{e} at #{e.backtrace.first}" + disconnect_client_and_exit + end + + def process_message(delivery_info, properties, payload) + raise "This method process_message(msg) should be overriden in the loop class!" + end + + private + + def create_client + debug "Create Bunny client..." + config['uri'] ||= 'amqp://127.0.0.1:5672' + + @client = Bunny.new(config['uri']) + @client.start + end + + def disconnect_client_and_exit + debug "Unsubscribing..." + @client.close + exit(0) + end + end +end diff --git a/lib/loops/logger.rb b/lib/loops/logger.rb index b124d90..7c36dc9 100644 --- a/lib/loops/logger.rb +++ b/lib/loops/logger.rb @@ -68,7 +68,7 @@ def logfile=(logfile) when 'stderr' then $stderr when IO, StringIO then logfile else - if Loops.root + if Loops.root && logfile logfile =~ /^\// ? logfile : Loops.root.join(logfile).to_s else logfile