Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 203 additions & 13 deletions src/mktl/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,92 @@ def __init__(self, store, alias, override=False, options=None):
self.logger.debug("daemon initialization complete")


def add_get_handler(self, key, method):
""" Define a method that will be called for all GET requests for
the specified item. See :func:`mktl.Item.req_get` for additional
details; inspection of the implementation for that method is
recommended to ensure all the necessary actions are covered.
"""

try:
item = self.store[key]
except KeyError:
raise KeyError('this daemon does not contain ' + repr(key))

self.rep._req_get_handlers[item.full_key] = method


def add_get_performer(self, key, method):
""" Define a method that will be called for all GET requests for
the specified item. Refer to :func:`mktl.Item.add_set_performer`
for additional details.
"""

key = key.lower()
key = key.strip()

try:
existing = self.store._items[key]
except KeyError:
raise KeyError('this daemon does not contain ' + repr(key))

if existing is None or existing.authoritative == False:
self.add_item(item.Item, key)
existing = self.store[key]

existing.add_get_performer(method)


def add_handler(self, key, request, method):
""" Define a method that will be called for either GET or SET
requests, determined by the *request* argument, which must be one
of 'get' or 'set'.

See :func:`mktl.Item.req_get` and
:func:`mktl.Item.req_set` for additional details; inspection of
the implementation for those methods is recommended to ensure all
the necessary actions are covered.
"""

try:
item = self.store[key]
except KeyError:
raise KeyError('this daemon does not contain ' + repr(key))

request = request.lower()
request = request.strip()

if request == 'get':
self.add_get_handler(key, method)
elif request == 'set':
self.add_set_handler(key, method)
else:
raise ValueError("request must be either 'get' or 'set'")


def add_handlers(self, handlers):
""" This method is intended to be a single call, accepting a sequence
of triplets mapping external methods handling GET and SET
requests, bypassing the usual handling chain involving
:class:`mktl.Item` instances.

A triplet is the key for an item (omitting the store name), whether
this is a 'get' or a 'set' handler, and a valid reference to a
method. For example::

('temperature', 'get', mkwc.get_temperature)

See :func:`mktl.Item.req_get` and
:func:`mktl.Item.req_set` for additional details; inspection of
the implementation for those methods is recommended to ensure all
the necessary actions are covered.
"""

for triplet in handlers:
key,request,method = triplet
self.add_handler(key, request, method)


def add_item(self, item_class, key, **kwargs):
""" Add an :class:`mktl.Item` to this daemon instance; this is the entry
point for establishing an authoritative item, one that will handle
Expand Down Expand Up @@ -224,6 +310,91 @@ def add_item(self, item_class, key, **kwargs):
created.register(callback)


def add_performer(self, key, request, method):
""" Define a method that will be called for either GET or SET
requests, determined by the *request* argument, which must be one
of 'get' or 'set'.

See :func:`mktl.Item.add_get_performer` and
:func:`mktl.Item.add_set_performer` for additional details.
"""

# Allow this method to be called before placeholder Item instances
# have been established. The use of external performer methods,
# rather than overriding Item.perform_get() and Item.perform_set(),
# implies the caller will not be instantiating custom Item
# subclasses; instantiating them now ensures any custom code is
# exclusive.

try:
existing = self.store._items[key]
except KeyError:
raise KeyError('this daemon does not contain ' + repr(key))

if existing is None or existing.authoritative == False:
self.add_item(item.Item, key)
existing = self.store[key]

existing.add_performer(request, method)


def add_performers(self, performers):
""" This method is intended to be a single call, accepting a sequence
of triplets mapping external methods performing GET and SET
requests onto the :class:`mktl.Item` instances receiving those
requests from this daemon.

A triplet is the key for an item (omitting the store name), whether
this is a 'get' or a 'set' performer, and a valid reference to a
method. For example::

('temperature', 'get', mkwc.get_temperature)

See :func:`mktl.Item.add_get_performer` and
:func:`mktl.Item.add_set_performer` for additional details.
"""

for triplet in performers:
key,request,method = triplet
self.add_performer(key, request, method)


def add_set_handler(self, key, method):
""" Define a method that will be called for all SET requests for
the specified item. See :func:`mktl.Item.req_set` for additional
details; inspection of the implementation for that method is
recommended to ensure all the necessary actions are covered.
"""

try:
item = self.store[key]
except KeyError:
raise KeyError('this daemon does not contain ' + repr(key))

self.rep._req_set_handlers[item.full_key] = method


def add_set_performer(self, key, method):
""" Define a method that will be called for all SET requests for
the specified item. Refer to :func:`mktl.Item.add_set_performer`
for additional details.
"""

key = key.lower()
key = key.strip()

try:
existing = self.store._items[key]
except KeyError:
raise KeyError('this daemon does not contain ' + repr(key))

if existing is None or existing.authoritative == False:
self.add_item(item.Item, key)
existing = self.store[key]

existing.add_set_performer(method)


def _begin_persistence(self):
""" Start the background process responsible for updating the
persistent value cache.
Expand Down Expand Up @@ -464,6 +635,9 @@ def __init__(self, daemon, *args, **kwargs):
protocol.request.Server.__init__(self, *args, **kwargs)
self.daemon = daemon

self._req_get_handlers = dict()
self._req_set_handlers = dict()


def req_config(self, request):

Expand Down Expand Up @@ -516,6 +690,13 @@ def req_handler(self, request):

def req_get(self, request):

try:
getter = self._req_get_handlers[request.target]
except KeyError:
pass
else:
return getter(request)

store, key = request.target.split('.', 1)

if store != self.daemon.store.name:
Expand All @@ -528,12 +709,29 @@ def req_get(self, request):
else:
raise KeyError('this daemon does not contain ' + repr(key))

response = self.daemon.store[key].req_get(request)
return response
getter = self.daemon.store[key].req_get
self._req_get_handlers[request.target] = getter
return getter(request)


def req_set(self, request):

### This may be the right place to send a publish message indicating
### that a set request has been received. This would largely be a
### debug message, structured exactly like a publish request, but
### with a leading 'set:' for the topic to distinguish it from anything
### that might be a normal broadcast.

### This would allow a debug client to subscribe to all messages with
### a leading 'set:' topic.

try:
setter = self._req_set_handlers[request.target]
except KeyError:
pass
else:
return setter(request)

store, key = request.target.split('.', 1)

if store != self.daemon.store.name:
Expand All @@ -546,17 +744,9 @@ def req_set(self, request):
else:
raise KeyError('this daemon does not contain ' + repr(key))

### This may be the right place to send a publish message indicating
### that a set request has been received. This would largely be a
### debug message, structured exactly like a publish request, but
### with a leading 'set:' for the topic to distinguish it from anything
### that might be a normal broadcast.

### This would allow a debug client to subscribe to all messages with
### a leading 'set:' topic.

response = self.daemon.store[key].req_set(request)
return response
setter = self.daemon.store[key].req_set
self._req_set_handlers[request.target] = setter
return setter(request)


def req_hash(self, request):
Expand Down
85 changes: 85 additions & 0 deletions src/mktl/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def __init__(self, store, key, subscribe=True, authoritative=False, pub=None):
self._update_thread = None
self._updated = threading.Event()

self._perform_get_external = None
self._perform_set_external = None

# An Item is a singleton in practice; enforce that constraint.

try:
Expand Down Expand Up @@ -125,6 +128,57 @@ def __init__(self, store, key, subscribe=True, authoritative=False, pub=None):
self.subscribe(prime=prime)


def add_get_performer(self, method):
""" Define a method that will be called for all GET requests for
this item. This will replace the :func:`perform_get` method.
The performer method must accept no arguments; refer to
:func:`perform_get` for additional details.
"""

if callable(method):
pass
else:
raise TypeError('the performer method must be callable')

self._perform_get_external = method
self.perform_get = self._perform_get_wrapper


def add_performer(self, request, method):
""" Define a method that will be called for either GET or SET requests,
determined by the *request* argument, which must be one of 'get' or
'set'. See :func:`add_get_performer` and :func:`add_set_performer`
for additional information.
"""

request = request.lower()
request = request.strip()

if request == 'get':
return self.add_get_performer(method)
elif request == 'set':
return self.add_set_performer(method)
else:
raise ValueError("request must be either 'get' or 'set'")


def add_set_performer(self, method):
""" Define a method that will be called for all SET requests for
this item. This will replace the :func:`perform_set` method.
The performer method must accept one argument, the 'unformatted'
Python native representation of the item value; refer to
:func:`perform_set` for additional details.
"""

if callable(method):
pass
else:
raise TypeError('the performer method must be callable')

self._perform_set_external = method
self.perform_set = self._perform_set_wrapper


def _cleanup(self):
""" Shut down any background processing involved with this item.
In the general case this is not required; :class:`Item` instances
Expand Down Expand Up @@ -363,6 +417,14 @@ def perform_get(self):
is returned it will be used as-is, otherwise the return value
will be passed to :func:`to_payload` for encapsulation.

Calls to this method will be handled in a background thread with
no restrictions on how long a get request takes to process. There
are also no restrictions on concurrency, though ideally a get
request is trivial to execute; this method will be called
repeatedly if multiple requests arrive and one or more requests are
already in progress. Management of concurrent requests is left
entirely to the implementer.

Returning None will not clear the currently known value, that will
only occur if the returned Payload instance is assigned None as the
'value'; this is not expected to be a common occurrence, but if a
Expand All @@ -378,12 +440,27 @@ def perform_get(self):
return payload


def _perform_get_wrapper(self):
""" The only purpose of this wrapper method is to strip the 'self'
argument from a call to an external method.
"""

return self._perform_get_external()


def perform_set(self, new_value):
""" Implement any custom behavior that should occur as a result of
a set request for this item. No return value is expected. Any
subclass implementations should raise an exception in order to
trigger an error response.

Calls to this method will be handled in a background thread with
no restrictions on how long a set request takes to process. There
are also no restrictions on concurrency; this method will be called
repeatedly if multiple requests arrive and one or more requests are
already in progress. Management of concurrent requests is left
entirely to the implementer.

Any return value, though again none is expected, will be
encapsulated via :func:`to_payload`, after the same fashion as
:func:`perform_get`, and included in the response to the original
Expand All @@ -397,6 +474,14 @@ def perform_set(self, new_value):
pass


def _perform_set_wrapper(self, new_value):
""" The only purpose of this wrapper method is to strip the 'self'
argument from a call to an external method.
"""

return self._perform_set_external(new_value)


def poll(self, period):
""" Poll for a new value every *period* seconds. Polling will be
discontinued if *period* is set to None or zero. Polling will
Expand Down
Loading