@@ -131,10 +131,7 @@ class PluginRunner(object):
131131 Calls to plugins are done via greenlets
132132 """
133133
134- def __init__ (self , use_gevent = False ):
135- if use_gevent :
136- import gevent
137- self .gevent = gevent
134+ def __init__ (self ):
138135 self .bot_bus = redis .StrictRedis .from_url (
139136 settings .REDIS_PLUGIN_QUEUE_URL )
140137 self .storage = redis .StrictRedis .from_url (
@@ -176,32 +173,15 @@ def register(self, plugin):
176173 getattr (self , attr .route_rule [0 ] + '_router' ).setdefault (
177174 plugin .slug , []).append ((attr .route_rule [1 ], attr , plugin ))
178175
179- def listen (self ):
180- """Listens for incoming messages on the Redis queue"""
181- while 1 :
182- val = None
183- try :
184- val = self .bot_bus .blpop ('q' , 1 )
185-
186- # Track q length
187- ql = self .bot_bus .llen ('q' )
188- statsd .gauge ("." .join (["plugins" , "q" ]), ql )
189-
190- if val :
191- _ , val = val
192- LOG .debug ('Recieved: %s' , val )
193- line = Line (json .loads (val ), self )
194-
195- # Calculate the transport latency between go and the plugins.
196- delta = datetime .utcnow ().replace (tzinfo = utc ) - line ._received
197- statsd .timing ("." .join (["plugins" , "latency" ]),
198- delta .total_seconds () * 1000 )
176+ def process_line (self , line_json ):
177+ LOG .debug ('Recieved: %s' , line_json )
178+ line = Line (json .loads (line_json ), self )
179+ # Calculate the transport latency between go and the plugins.
180+ delta = datetime .utcnow ().replace (tzinfo = utc ) - line ._received
181+ statsd .timing ("." .join (["plugins" , "latency" ]),
182+ delta .total_seconds () * 1000 )
183+ self .dispatch (line )
199184
200- self .dispatch (line )
201- except Exception :
202- LOG .error ("Line Dispatch Failed" , exc_info = True , extra = {
203- "line" : val
204- })
205185
206186 def dispatch (self , line ):
207187 """Given a line, dispatch it to the right plugins & functions."""
@@ -214,16 +194,11 @@ def dispatch(self, line):
214194 # firehose gets everything, no rule matching
215195 LOG .info ('Match: %s.%s' , plugin_slug , func .__name__ )
216196 with statsd .timer ("." .join (["plugins" , plugin_slug ])):
217- # FIXME: This will not have correct timing if go back to
218- # gevent.
219197 channel_plugin = self .setup_plugin_for_channel (
220198 plugin .__class__ , line )
221199 new_func = log_on_error (LOG , getattr (channel_plugin ,
222200 func .__name__ ))
223- if hasattr (self , 'gevent' ):
224- self .gevent .Greenlet .spawn (new_func , line )
225- else :
226- channel_plugin .respond (new_func (line ))
201+ channel_plugin .respond (new_func (line ))
227202
228203 # pass line to other routers
229204 if line ._is_message :
@@ -252,30 +227,12 @@ def check_for_plugin_route_matches(self, line, router):
252227 if match :
253228 LOG .info ('Match: %s.%s' , plugin_slug , func .__name__ )
254229 with statsd .timer ("." .join (["plugins" , plugin_slug ])):
255- # FIXME: This will not have correct timing if go back to
256- # gevent.
257230 # Instantiate a plugin specific to this channel
258231 channel_plugin = self .setup_plugin_for_channel (
259232 plugin .__class__ , line )
260233 # get the method from the channel-specific plugin
261234 new_func = log_on_error (LOG , getattr (channel_plugin ,
262235 func .__name__ ))
263- if hasattr (self , 'gevent' ):
264- grnlt = self .gevent .Greenlet (new_func , line ,
265- ** match .groupdict ())
266- grnlt .link_value (channel_plugin .greenlet_respond )
267- grnlt .start ()
268- else :
269- channel_plugin .respond (new_func (line ,
270- ** match .groupdict ()))
271236
272-
273- def start_plugins (* args , ** kwargs ):
274- """
275- Used by the management command to start-up plugin listener
276- and register the plugins.
277- """
278- LOG .info ('Starting plugins. Gevent=%s' , kwargs ['use_gevent' ])
279- app = PluginRunner (** kwargs )
280- app .register_all_plugins ()
281- app .listen ()
237+ channel_plugin .respond (new_func (line ,
238+ ** match .groupdict ()))
0 commit comments