From 20e399d522f5a4fbcdd21327036d9c77dd891e60 Mon Sep 17 00:00:00 2001 From: Mokevnin Kirill Date: Mon, 17 Jan 2011 19:37:50 +0300 Subject: [PATCH 1/2] add amqp worker with 'tmm1-amqp' gem --- lib/loops/amqp.rb | 95 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 lib/loops/amqp.rb diff --git a/lib/loops/amqp.rb b/lib/loops/amqp.rb new file mode 100644 index 0000000..10c93fe --- /dev/null +++ b/lib/loops/amqp.rb @@ -0,0 +1,95 @@ +module Ext + module Loops + class Amqp < Base + + def self.check_dependencies + gem 'tmm1-amqp' + end + + def subscribes + raise "This method 'subscribes' should be overriden in the loop class!" + end + + def run + setup_signals + subscribes + raise 'No subscribe given!' unless @subscribes || @subscribes.any? + AMQP.start(connection_params) do + @subscribes.each do |block| + block.call + end + end + rescue Exception => e + error "Closing queue connection because of exception: #{e} at #{e.backtrace.first}" + disconnect_client_and_exit + end + + private + + def subscribe(handler, options) + raise ArgumentError, "No queue name (:queue_name) given!" unless options[:queue_name] + raise ArgumentError, "No exchange name (:exchange_name) given!" unless options[:exchange_name] + + options[:prefetch] ||= 1 + options[:ack] ||= true + options[:type] ||= :topic + options[:sleep] ||= 1 + + options[:exchange] ||= {} + options[:exchange][:durable] ||= false + options[:queue] ||= {} + options[:queue][:durable] ||= false + options[:bind] ||= {} + + + proc = Proc.new do + mq = MQ.new + mq.prefetch(options[:prefetch]) + exchange = mq.__send__ options[:type], options[:exchange_name], options[:exchange] + + debug "Subscribing for the queue #{options[:queue_name]}..." + + mq.queue(options[:queue_name], options[:queue]). + bind(exchange, options[:bind]). + subscribe(:ack => options[:ack]) do |header, msg| + begin + handler.call(msg, header) + header.ack if options[:ack] #TODO + rescue Exception => e + error "Exception from process message! We won't be ACKing the message." + error "Details: #{e} at #{e.backtrace.first}" + disconnect_client_and_exit + end + end + end + + debug "Adding proc to @subscribes" + @subscribes ||= [] + @subscribes << proc + end + + def disconnect_client_and_exit + debug "Close..." + AMQP.stop{ EM.stop } + exit(0) + end + + def connection_params + connection_params ||= {} + connection_params[:host] = config["host"] || 'localhost' + connection_params[:user] = config["user"] || 'guest' + connection_params[:pass] = config["pass"] || 'guest' + connection_params[:vhost] = config["vhost"] || '/' + connection_params[:timeout] = config["timeout"] || nil + connection_params[:logging] = config["logging"] || false + + connection_params + end + + def setup_signals + Signal.trap('INT') { disconnect_client_and_exit } + Signal.trap('TERM') { disconnect_client_and_exit } + end + end + end +end \ No newline at end of file From f4bf4ff3533c116e53556f3ce9a11567a389e42b Mon Sep 17 00:00:00 2001 From: Mokevnin Kirill Date: Mon, 17 Jan 2011 19:49:00 +0300 Subject: [PATCH 2/2] fix main module --- lib/loops/amqp.rb | 156 +++++++++++++++++++++++----------------------- 1 file changed, 77 insertions(+), 79 deletions(-) diff --git a/lib/loops/amqp.rb b/lib/loops/amqp.rb index 10c93fe..36c2a55 100644 --- a/lib/loops/amqp.rb +++ b/lib/loops/amqp.rb @@ -1,95 +1,93 @@ -module Ext - module Loops - class Amqp < Base +module Loops + class Amqp < Base - def self.check_dependencies - gem 'tmm1-amqp' - end + def self.check_dependencies + gem 'tmm1-amqp' + end - def subscribes - raise "This method 'subscribes' should be overriden in the loop class!" - end + def subscribes + raise "This method 'subscribes' should be overriden in the loop class!" + end - def run - setup_signals - subscribes - raise 'No subscribe given!' unless @subscribes || @subscribes.any? - AMQP.start(connection_params) do - @subscribes.each do |block| - block.call - end + def run + setup_signals + subscribes + raise 'No subscribe given!' unless @subscribes || @subscribes.any? + AMQP.start(connection_params) do + @subscribes.each do |block| + block.call end - rescue Exception => e - error "Closing queue connection because of exception: #{e} at #{e.backtrace.first}" - disconnect_client_and_exit end + rescue Exception => e + error "Closing queue connection because of exception: #{e} at #{e.backtrace.first}" + disconnect_client_and_exit + end - private - - def subscribe(handler, options) - raise ArgumentError, "No queue name (:queue_name) given!" unless options[:queue_name] - raise ArgumentError, "No exchange name (:exchange_name) given!" unless options[:exchange_name] - - options[:prefetch] ||= 1 - options[:ack] ||= true - options[:type] ||= :topic - options[:sleep] ||= 1 - - options[:exchange] ||= {} - options[:exchange][:durable] ||= false - options[:queue] ||= {} - options[:queue][:durable] ||= false - options[:bind] ||= {} - - - proc = Proc.new do - mq = MQ.new - mq.prefetch(options[:prefetch]) - exchange = mq.__send__ options[:type], options[:exchange_name], options[:exchange] - - debug "Subscribing for the queue #{options[:queue_name]}..." - - mq.queue(options[:queue_name], options[:queue]). - bind(exchange, options[:bind]). - subscribe(:ack => options[:ack]) do |header, msg| - begin - handler.call(msg, header) - header.ack if options[:ack] #TODO - rescue Exception => e - error "Exception from process message! We won't be ACKing the message." - error "Details: #{e} at #{e.backtrace.first}" - disconnect_client_and_exit - end + private + + def subscribe(handler, options) + raise ArgumentError, "No queue name (:queue_name) given!" unless options[:queue_name] + raise ArgumentError, "No exchange name (:exchange_name) given!" unless options[:exchange_name] + + options[:prefetch] ||= 1 + options[:ack] ||= true + options[:type] ||= :topic + options[:sleep] ||= 1 + + options[:exchange] ||= {} + options[:exchange][:durable] ||= false + options[:queue] ||= {} + options[:queue][:durable] ||= false + options[:bind] ||= {} + + + proc = Proc.new do + mq = MQ.new + mq.prefetch(options[:prefetch]) + exchange = mq.__send__ options[:type], options[:exchange_name], options[:exchange] + + debug "Subscribing for the queue #{options[:queue_name]}..." + + mq.queue(options[:queue_name], options[:queue]). + bind(exchange, options[:bind]). + subscribe(:ack => options[:ack]) do |header, msg| + begin + handler.call(msg, header) + header.ack if options[:ack] #TODO + rescue Exception => e + error "Exception from process message! We won't be ACKing the message." + error "Details: #{e} at #{e.backtrace.first}" + disconnect_client_and_exit end end - - debug "Adding proc to @subscribes" - @subscribes ||= [] - @subscribes << proc end - def disconnect_client_and_exit - debug "Close..." - AMQP.stop{ EM.stop } - exit(0) - end + debug "Adding proc to @subscribes" + @subscribes ||= [] + @subscribes << proc + end - def connection_params - connection_params ||= {} - connection_params[:host] = config["host"] || 'localhost' - connection_params[:user] = config["user"] || 'guest' - connection_params[:pass] = config["pass"] || 'guest' - connection_params[:vhost] = config["vhost"] || '/' - connection_params[:timeout] = config["timeout"] || nil - connection_params[:logging] = config["logging"] || false + def disconnect_client_and_exit + debug "Close..." + AMQP.stop{ EM.stop } + exit(0) + end - connection_params - end + def connection_params + connection_params ||= {} + connection_params[:host] = config["host"] || 'localhost' + connection_params[:user] = config["user"] || 'guest' + connection_params[:pass] = config["pass"] || 'guest' + connection_params[:vhost] = config["vhost"] || '/' + connection_params[:timeout] = config["timeout"] || nil + connection_params[:logging] = config["logging"] || false - def setup_signals - Signal.trap('INT') { disconnect_client_and_exit } - Signal.trap('TERM') { disconnect_client_and_exit } - end + connection_params + end + + def setup_signals + Signal.trap('INT') { disconnect_client_and_exit } + Signal.trap('TERM') { disconnect_client_and_exit } end end end \ No newline at end of file