diff --git a/sbin/mkbrokerd b/sbin/mkbrokerd index e688fe3..ba161d5 100755 --- a/sbin/mkbrokerd +++ b/sbin/mkbrokerd @@ -400,6 +400,11 @@ class RequestServer(mktl.protocol.request.Server): will be generated. """ + if request.reply: + pass + else: + return + self.req_ack(request) type = request.type diff --git a/src/mktl/daemon.py b/src/mktl/daemon.py index fffebbd..3bef0db 100644 --- a/src/mktl/daemon.py +++ b/src/mktl/daemon.py @@ -488,11 +488,14 @@ def req_config(self, request): def req_handler(self, request): - """ Inspect the incoming request type and decide how a response - will be generated. + """ Inspect the incoming request type and call an appropriate + method to handle that specific request. """ - self.req_ack(request) + reply = request.reply + + if reply: + self.req_ack(request) type = request.type target = request.target @@ -511,7 +514,10 @@ def req_handler(self, request): else: raise ValueError('unhandled request type: ' + type) - return response + if reply: + return response + else: + return None def req_get(self, request): diff --git a/src/mktl/item.py b/src/mktl/item.py index 2b4c0d9..a9f26d1 100644 --- a/src/mktl/item.py +++ b/src/mktl/item.py @@ -661,15 +661,18 @@ def req_set(self, request): return payload - def set(self, new_value, wait=True, formatted=False, quantity=False): + def set(self, new_value, wait=True, reply=True, formatted=False, quantity=False): """ Set a new value. Set *wait* to True to block until the request completes; this is the default behavior. If *wait* is set to False, the caller will be returned a :class:`mktl.protocol.message.Request` instance, which has a :func:`mktl.protocol.message.Request.wait` method that can optionally be invoked to block until completion of the request; the wait will return immediately once the request is - satisfied. There is no return value for a blocking request; failed - requests will raise exceptions. + satisfied. Set *reply* to False to disable all error handling and + acknowledgements for the request (fire and forget); setting + *reply to False implies *wait* is also False. + There is no return value for a blocking request; failed requests + will raise exceptions. The optional *formatted* and *quantity* options enable calling :func:`set` with either the string-formatted representation or @@ -698,8 +701,13 @@ def set(self, new_value, wait=True, formatted=False, quantity=False): else: raise ValueError('formatted+quantity arguments must be boolean') - payload = self.to_payload(new_value) - payload.add_origin() + if reply: + payload = self.to_payload(new_value) + payload.add_origin() + else: + payload = self.to_payload(new_value, reply=False) + wait = False + message = protocol.message.Request('SET', self.full_key, payload) self.req.send(message) @@ -841,7 +849,7 @@ def to_format(self, value): return formatted - def to_payload(self, value=None, timestamp=None): + def to_payload(self, value=None, timestamp=None, **kwargs): """ Interpret the provided arguments into a :class:`mktl.protocol.message.Payload` instance; if the *value* is not specified the current value of this :class:`Item` will be @@ -877,11 +885,11 @@ def to_payload(self, value=None, timestamp=None): bulk = value.tobytes() except AttributeError: bulk = None - payload = protocol.message.Payload(value, timestamp) + payload = protocol.message.Payload(value, timestamp, **kwargs) else: shape = value.shape dtype = str(value.dtype) - payload = protocol.message.Payload(None, timestamp, bulk=bulk, shape=shape, dtype=dtype) + payload = protocol.message.Payload(None, timestamp, bulk=bulk, shape=shape, dtype=dtype, **kwargs) return payload diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index d4df094..b6aec67 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -195,6 +195,21 @@ def log(self, logger=None, level=logging.DEBUG): logger.log(level, message, *args) + @property + def reply(self): + """ The payload reply attribute is mirrored here for the sake of + simplifying exception handling elsewhere in the mKTL code base. + Otherwise, the other code would need to catch the AttributeError + thrown when the local payload is None. + """ + + try: + return self.payload.reply + except AttributeError: + # There is no payload, and message replies are enabled by default. + return True + + # end of class Message @@ -405,6 +420,20 @@ def __repr__(self): return self.encapsulate().decode() + def add_origin(self): + """ Add fields to this payload to provide information describing + the origin of this message. The primary use case is for debugging + or logging, as opposed to uniquely identifying the sender. + """ + + self._user = _origin_user + self._hostname = _origin_hostname + self._pid = _origin_pid + self._ppid = _origin_ppid + self._executable = sys.executable + self._argv = sys.argv + + def encapsulate(self): """ Add all non-omitted local attributes to a dictionary, and return the JSON encoding of that dictionary. For example, if the .value @@ -442,18 +471,28 @@ def encapsulate(self): return payload - def add_origin(self): - """ Add fields to this payload to provide information describing - the origin of this message. The primary use case is for debugging - or logging, as opposed to uniquely identifying the sender. + @property + def reply(self): + """ The reply attribute is generally only set to indicate that a + reply is not necessary. Establishing a property to return the + current value allows the exception handling to be done once, + here, and not everywhere the reply attribute might be inspected. + By a happy coincidence, the existence of this property does not + trigger the inclusion of 'reply' in the output of vars(), which + is how the :func:`encapsulate` method determines which local + attributes to include in the final output. """ - self._user = _origin_user - self._hostname = _origin_hostname - self._pid = _origin_pid - self._ppid = _origin_ppid - self._executable = sys.executable - self._argv = sys.argv + try: + return self.__reply + except AttributeError: + # Message replies are enabled by default. + return True + + + @reply.setter + def reply(self, new_value): + self.__reply = new_value # end of class Payload diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index f7d3c6a..d931096 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -40,6 +40,7 @@ def __init__(self, address, port): self.socket = zmq_context.socket(zmq.DEALER) self.socket.setsockopt(zmq.LINGER, 0) + self.socket.set_hwm(0) self.socket.identity = identity.encode() self.socket.connect(server) @@ -127,7 +128,7 @@ def _rep_incoming(self, parts): # allow it to pass, assuming the users know what they're doing. pass - response = message.Message('REP', target, payload, response_id) + response = message.Message('REP', target, payload, id=response_id) pending._complete(response) del self.pending[response_id] @@ -199,6 +200,11 @@ def send(self, message): self.requests.put(message) self.request_signal.send(b'') + if message.reply: + pass + else: + return + ack = message.wait_ack(self.timeout) if ack == False: @@ -240,6 +246,7 @@ def __init__(self, hostname=None, port=None, avoid=set()): self.hostname = hostname self.socket = zmq_context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 0) + self.socket.set_hwm(0) # If the port is set, use it; otherwise, look for the first available # port within the default range. @@ -358,6 +365,11 @@ def req_handler(self, request): structure of what's happening in the daemon code. """ + if request.reply: + pass + else: + return + self.req_ack(request) response = message.Message('REP', target, id=request.id) @@ -415,7 +427,7 @@ def req_incoming(self, parts): # allow it to pass, assuming the users know what they're doing. pass - request = message.Request(req_type, target, payload, req_id) + request = message.Request(req_type, target, payload, id=req_id) request.prefix = (ident,) payload = None error = None @@ -432,7 +444,8 @@ def req_incoming(self, parts): if payload is None and error is None: # The handler should only return None when no response is # immediately forthcoming-- the handler has invoked some - # other processing chain that will issue a proper response. + # other processing chain that will issue a proper response, + # or the client explicitly requested no response. return if error is not None: @@ -442,7 +455,7 @@ def req_incoming(self, parts): elif payload.error is None: payload.error = error - response = message.Message('REP', target, payload, req_id) + response = message.Message('REP', target, payload, id=req_id) response.prefix = request.prefix self.send(response) @@ -464,6 +477,9 @@ def run(self): elif self.socket == active: parts = self.socket.recv_multipart() # Calling submit() will block if a worker is not available. + # Note that for high frequency operations this can result + # in out-of-order handling of requests, for example, if a + # stream of SET requests are inbound for a single item. self.workers.submit(self.req_incoming, parts)