Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a4959f2
Always specify the id as a keyword argument.
Feb 3, 2026
257484f
Honor a client request to omit the ACK response.
Feb 3, 2026
c7ad4fd
Honor a client request to omit the ACK response.
Feb 3, 2026
f3a5811
Allow arbitrary additional arguments in a payload. Use that capabilit…
Feb 3, 2026
dec3a21
Disable the high water mark for request/response; it was coming up wi…
Feb 3, 2026
9039a28
Put the client background thread back the way it was, the changes there
Feb 3, 2026
99ca2a0
Change the new 'ack' argument to set() to be 'response' instead.
Feb 4, 2026
deb8d75
Trade out the 'ack' field of the payload for 'silent'.
Feb 4, 2026
1b88742
Trade out the 'ack' field of the payload for 'silent'.
Feb 4, 2026
dcdec27
Trade out the response argument in set() for silent.
Feb 4, 2026
8d1a21d
Trade out the 'ack' field of the payload for 'silent'.
Feb 4, 2026
ca3b193
Added a comment adjacent to the call to req_handler() indicating that…
Feb 4, 2026
eecd838
Docstring tweak.
Feb 4, 2026
4bc95ff
Added a comment about potential out-of-order processing for high freq…
Feb 4, 2026
65b990b
Trade out the 'silent' argument for 'reply' instead.
Feb 4, 2026
22bd3a3
I was trying to realign that background thread with the original
Feb 6, 2026
3426b8e
Use a property for the 'reply' attribute on a Payload.
Feb 10, 2026
b278fa6
Mirror the Payload.reply attribute as Message.reply to simplify excep…
Feb 10, 2026
374da6c
Payload.reply is now a property (mirrored to Message.reply) to simplify
Feb 10, 2026
99f2e84
Merge branch 'main' into no_ack
klanclos Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sbin/mkbrokerd
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ class RequestServer(mktl.protocol.request.Server):
will be generated.
"""

if request.reply:
pass
else:
return

self.req_ack(request)

type = request.type
Expand Down
14 changes: 10 additions & 4 deletions src/mktl/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,14 @@ def req_config(self, request):


def req_handler(self, request):
""" Inspect the incoming request type and decide how a response
will be generated.
""" Inspect the incoming request type and call an appropriate
method to handle that specific request.
"""

self.req_ack(request)
reply = request.reply

if reply:
self.req_ack(request)

type = request.type
target = request.target
Expand All @@ -511,7 +514,10 @@ def req_handler(self, request):
else:
raise ValueError('unhandled request type: ' + type)

return response
if reply:
return response
else:
return None


def req_get(self, request):
Expand Down
24 changes: 16 additions & 8 deletions src/mktl/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,15 +661,18 @@ def req_set(self, request):
return payload


def set(self, new_value, wait=True, formatted=False, quantity=False):
def set(self, new_value, wait=True, reply=True, formatted=False, quantity=False):
""" Set a new value. Set *wait* to True to block until the request
completes; this is the default behavior. If *wait* is set to False,
the caller will be returned a :class:`mktl.protocol.message.Request`
instance, which has a :func:`mktl.protocol.message.Request.wait`
method that can optionally be invoked to block until completion of
the request; the wait will return immediately once the request is
satisfied. There is no return value for a blocking request; failed
requests will raise exceptions.
satisfied. Set *reply* to False to disable all error handling and
acknowledgements for the request (fire and forget); setting
*reply to False implies *wait* is also False.
There is no return value for a blocking request; failed requests
will raise exceptions.

The optional *formatted* and *quantity* options enable calling
:func:`set` with either the string-formatted representation or
Expand Down Expand Up @@ -698,8 +701,13 @@ def set(self, new_value, wait=True, formatted=False, quantity=False):
else:
raise ValueError('formatted+quantity arguments must be boolean')

payload = self.to_payload(new_value)
payload.add_origin()
if reply:
payload = self.to_payload(new_value)
payload.add_origin()
else:
payload = self.to_payload(new_value, reply=False)
wait = False

message = protocol.message.Request('SET', self.full_key, payload)
self.req.send(message)

Expand Down Expand Up @@ -841,7 +849,7 @@ def to_format(self, value):
return formatted


def to_payload(self, value=None, timestamp=None):
def to_payload(self, value=None, timestamp=None, **kwargs):
""" Interpret the provided arguments into a
:class:`mktl.protocol.message.Payload` instance; if the *value* is
not specified the current value of this :class:`Item` will be
Expand Down Expand Up @@ -877,11 +885,11 @@ def to_payload(self, value=None, timestamp=None):
bulk = value.tobytes()
except AttributeError:
bulk = None
payload = protocol.message.Payload(value, timestamp)
payload = protocol.message.Payload(value, timestamp, **kwargs)
else:
shape = value.shape
dtype = str(value.dtype)
payload = protocol.message.Payload(None, timestamp, bulk=bulk, shape=shape, dtype=dtype)
payload = protocol.message.Payload(None, timestamp, bulk=bulk, shape=shape, dtype=dtype, **kwargs)

return payload

Expand Down
59 changes: 49 additions & 10 deletions src/mktl/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,21 @@ def log(self, logger=None, level=logging.DEBUG):
logger.log(level, message, *args)


@property
def reply(self):
""" The payload reply attribute is mirrored here for the sake of
simplifying exception handling elsewhere in the mKTL code base.
Otherwise, the other code would need to catch the AttributeError
thrown when the local payload is None.
"""

try:
return self.payload.reply
except AttributeError:
# There is no payload, and message replies are enabled by default.
return True


# end of class Message


Expand Down Expand Up @@ -405,6 +420,20 @@ def __repr__(self):
return self.encapsulate().decode()


def add_origin(self):
""" Add fields to this payload to provide information describing
the origin of this message. The primary use case is for debugging
or logging, as opposed to uniquely identifying the sender.
"""

self._user = _origin_user
self._hostname = _origin_hostname
self._pid = _origin_pid
self._ppid = _origin_ppid
self._executable = sys.executable
self._argv = sys.argv


def encapsulate(self):
""" Add all non-omitted local attributes to a dictionary, and return
the JSON encoding of that dictionary. For example, if the .value
Expand Down Expand Up @@ -442,18 +471,28 @@ def encapsulate(self):
return payload


def add_origin(self):
""" Add fields to this payload to provide information describing
the origin of this message. The primary use case is for debugging
or logging, as opposed to uniquely identifying the sender.
@property
def reply(self):
""" The reply attribute is generally only set to indicate that a
reply is not necessary. Establishing a property to return the
current value allows the exception handling to be done once,
here, and not everywhere the reply attribute might be inspected.
By a happy coincidence, the existence of this property does not
trigger the inclusion of 'reply' in the output of vars(), which
is how the :func:`encapsulate` method determines which local
attributes to include in the final output.
"""

self._user = _origin_user
self._hostname = _origin_hostname
self._pid = _origin_pid
self._ppid = _origin_ppid
self._executable = sys.executable
self._argv = sys.argv
try:
return self.__reply
except AttributeError:
# Message replies are enabled by default.
return True


@reply.setter
def reply(self, new_value):
self.__reply = new_value


# end of class Payload
Expand Down
24 changes: 20 additions & 4 deletions src/mktl/protocol/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self, address, port):

self.socket = zmq_context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.set_hwm(0)
self.socket.identity = identity.encode()
self.socket.connect(server)

Expand Down Expand Up @@ -127,7 +128,7 @@ def _rep_incoming(self, parts):
# allow it to pass, assuming the users know what they're doing.
pass

response = message.Message('REP', target, payload, response_id)
response = message.Message('REP', target, payload, id=response_id)
pending._complete(response)
del self.pending[response_id]

Expand Down Expand Up @@ -199,6 +200,11 @@ def send(self, message):
self.requests.put(message)
self.request_signal.send(b'')

if message.reply:
pass
else:
return

ack = message.wait_ack(self.timeout)

if ack == False:
Expand Down Expand Up @@ -240,6 +246,7 @@ def __init__(self, hostname=None, port=None, avoid=set()):
self.hostname = hostname
self.socket = zmq_context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.set_hwm(0)

# If the port is set, use it; otherwise, look for the first available
# port within the default range.
Expand Down Expand Up @@ -358,6 +365,11 @@ def req_handler(self, request):
structure of what's happening in the daemon code.
"""

if request.reply:
pass
else:
return

self.req_ack(request)

response = message.Message('REP', target, id=request.id)
Expand Down Expand Up @@ -415,7 +427,7 @@ def req_incoming(self, parts):
# allow it to pass, assuming the users know what they're doing.
pass

request = message.Request(req_type, target, payload, req_id)
request = message.Request(req_type, target, payload, id=req_id)
request.prefix = (ident,)
payload = None
error = None
Expand All @@ -432,7 +444,8 @@ def req_incoming(self, parts):
if payload is None and error is None:
# The handler should only return None when no response is
# immediately forthcoming-- the handler has invoked some
# other processing chain that will issue a proper response.
# other processing chain that will issue a proper response,
# or the client explicitly requested no response.
return

if error is not None:
Expand All @@ -442,7 +455,7 @@ def req_incoming(self, parts):
elif payload.error is None:
payload.error = error

response = message.Message('REP', target, payload, req_id)
response = message.Message('REP', target, payload, id=req_id)
response.prefix = request.prefix

self.send(response)
Expand All @@ -464,6 +477,9 @@ def run(self):
elif self.socket == active:
parts = self.socket.recv_multipart()
# Calling submit() will block if a worker is not available.
# Note that for high frequency operations this can result
# in out-of-order handling of requests, for example, if a
# stream of SET requests are inbound for a single item.
self.workers.submit(self.req_incoming, parts)


Expand Down
Loading