Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ pyzmq = ">=4.0"
msgspec = { version = ">=0.18", optional = true }
numpy = { version = ">=1.6", optional = true }
orjson = { version = ">=3.0", optional = true }
pika = { version = ">=1.3", optional = true }
pint = { version = ">=0.17", optional = true }

[tool.poetry.extras]
msgspec = ["msgspec"]
numpy = ["numpy"]
orjson = ["orjson"]
pika = ["pika"]
pint = ["pint"]
rabbitmq = ["pika"]

[tool.poetry.urls]
repository = "https://github.com/KeckObservatory/mKTL"
Expand Down
1 change: 0 additions & 1 deletion src/mktl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@
from .item import Item
from .store import Store
from .daemon import Daemon
Payload = protocol.message.Payload

# vim: set expandtab tabstop=8 softtabstop=4 shiftwidth=4 autoindent:
19 changes: 10 additions & 9 deletions src/mktl/begin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
"""

import threading
import zmq

from . import config
from . import protocol
from . import transport
from .transport import TransportError
from .store import Store


Expand Down Expand Up @@ -47,21 +48,21 @@ def discover(*targets):
# Hacking the timeout for discovery, this is not expected to throw
# errors with minimal delay.

old_timeout = protocol.request.Client.timeout
protocol.request.Client.timeout = 0.5
old_timeout = transport.request.Client.timeout
transport.request.Client.timeout = 0.5

for address,port in brokers:
request = protocol.message.Request('HASH')
try:
payload = protocol.request.send(address, port, request)
payload = transport.request.send(address, port, request)
except:
continue

hashes = payload.value

for store in hashes.keys():
request = protocol.message.Request('CONFIG', store)
payload = protocol.request.send(address, port, request)
payload = transport.request.send(address, port, request)

blocks = payload.value

Expand All @@ -71,7 +72,7 @@ def discover(*targets):
configuration.update(block)


protocol.request.Client.timeout = old_timeout
transport.request.Client.timeout = old_timeout



Expand Down Expand Up @@ -140,7 +141,7 @@ def get(store, key=None):

hostname,port = brokers[0]
message = protocol.message.Request('CONFIG', store)
payload = protocol.request.send(hostname, port, message)
payload = transport.request.send(hostname, port, message)

blocks = payload.value

Expand Down Expand Up @@ -200,12 +201,12 @@ def refresh(configuration):
hostname = stratum['hostname']
rep = stratum['rep']

client = protocol.request.client(hostname, rep)
client = transport.request.client(hostname, rep)
request = protocol.message.Request('HASH', store)

try:
client.send(request)
except zmq.ZMQError:
except TransportError:
# No response from this daemon; move on to the next entry in
# the provenance. If no daemons respond the client will have
# to rely on the local disk cache.
Expand Down
7 changes: 4 additions & 3 deletions src/mktl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import threading
import time
import uuid
import zmq

# Importing pint is expensive, representing something like 30% of the
# user runtime for a simple mKTL command. It will be imported on a
Expand All @@ -19,6 +18,8 @@

from . import json
from . import protocol
from . import transport
from .transport import TransportError


_cache = dict()
Expand Down Expand Up @@ -1195,8 +1196,8 @@ def announce(config, uuid, override=False):

for address,port in brokers:
try:
payload = protocol.request.send(address, port, message)
except zmq.error.ZMQError:
payload = transport.request.send(address, port, message)
except TransportError:
continue

error = payload.error
Expand Down
19 changes: 10 additions & 9 deletions src/mktl/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import sys
import threading
import time
import zmq

from . import begin
from . import config
Expand All @@ -18,6 +17,8 @@
from . import poll
from . import protocol
from . import store
from . import transport
from .transport import TransportError, TransportPortError


class Daemon:
Expand Down Expand Up @@ -85,15 +86,15 @@ def __init__(self, store, alias, override=False, options=None):
self._test_port(store, rep)

try:
self.pub = protocol.publish.Server(port=pub, avoid=avoid)
except zmq.error.ZMQError:
self.pub = protocol.publish.Server(port=None, avoid=avoid)
self.pub = transport.publish.Server(port=pub, avoid=avoid)
except TransportPortError:
self.pub = transport.publish.Server(port=None, avoid=avoid)

avoid = _used_ports()

try:
self.rep = RequestServer(self, port=rep, avoid=avoid)
except zmq.error.ZMQError:
except TransportPortError:
self.rep = RequestServer(self, port=None, avoid=avoid)

_save_port(store, self.uuid, self.rep.port, self.pub.port)
Expand Down Expand Up @@ -420,8 +421,8 @@ def _test_port(self, store, port):
request = protocol.message.Request('CONFIG', store)

try:
payload = protocol.request.send(hostname, port, request)
except zmq.ZMQError:
payload = transport.request.send(hostname, port, request)
except TransportError:
# Not running; perfect.
return

Expand All @@ -446,10 +447,10 @@ def _test_port(self, store, port):



class RequestServer(protocol.request.Server):
class RequestServer(transport.request.Server):

def __init__(self, daemon, *args, **kwargs):
protocol.request.Server.__init__(self, *args, **kwargs)
transport.request.Server.__init__(self, *args, **kwargs)
self.daemon = daemon


Expand Down
9 changes: 5 additions & 4 deletions src/mktl/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import threading
import time
import traceback
import zmq

try:
import numpy
Expand All @@ -12,7 +11,9 @@

from . import protocol
from . import poll
from . import transport
from . import weakref
from .transport import TransportError


class Item:
Expand Down Expand Up @@ -95,8 +96,8 @@ def __init__(self, store, key, subscribe=True, authoritative=False, pub=None):
# configuration that doesn't contain a provenance.
raise RuntimeError('cannot find daemon for ' + self.full_key)

self.sub = protocol.publish.client(hostname, pub)
self.req = protocol.request.client(hostname, rep)
self.sub = transport.publish.client(hostname, pub)
self.req = transport.request.client(hostname, rep)

try:
settable = self.config['settable']
Expand Down Expand Up @@ -779,7 +780,7 @@ def subscribe(self, prime=True):
if prime == True:
try:
self.get(refresh=True)
except (zmq.ZMQError, RuntimeError):
except (TransportError, RuntimeError):
# Connection errors and remote errors on priming reads are
# thrown away; an error here means the remote daemon is not
# available to respond to requests, but despite that error
Expand Down
30 changes: 26 additions & 4 deletions src/mktl/transport/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
"""Transport implementations.
"""Transport layer implementations."""

Each transport is responsible for mapping protocol :class:`mktl.protocol.Message`
objects to/from a wire representation.
"""
import os

from .base import (
TransportError,
TransportTimeout,
TransportConnectionError,
TransportPortError,
)

_BACKEND = os.environ.get("MKTL_TRANSPORT", "zmq")

if _BACKEND == "zmq":
from .zmq import request
from .zmq import publish
elif _BACKEND == "rabbitmq":
try:
from .rabbitmq import request
from .rabbitmq import publish
except ImportError:
raise ImportError(
"MKTL_TRANSPORT='rabbitmq' requires pika: "
"pip install mKTL[rabbitmq]"
)
else:
raise ImportError(f"unknown MKTL_TRANSPORT backend: {_BACKEND!r}")
42 changes: 38 additions & 4 deletions src/mktl/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,49 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Optional

from ..protocol import Message
from ..protocol.message import Message


# Transport agnostic exceptions

class TransportError(Exception):
"""Base class for all transport-layer errors."""


class TransportTimeout(TransportError):
"""A request did not receive a timely response."""


class TransportConnectionError(TransportError):
"""The transport could not establish or maintain a connection."""


class TransportPortError(TransportError):
"""No suitable port could be bound or connected."""


class Transport(ABC):
"""Minimal contract for a wire-level transport."""

@abstractmethod
def open(self) -> None:
"""Establish the underlying connection/socket."""

@abstractmethod
def close(self) -> None:
"""Tear down the underlying connection/socket."""

@abstractmethod
def send(self, msg: Message) -> None:
"""Send a protocol message."""
"""Send a protocol Message."""

@abstractmethod
def recv(self) -> Message:
"""Receive the next protocol message."""
def recv(self, timeout: Optional[float] = None) -> Message:
"""Receive the next protocol Message."""

@property
def is_open(self) -> bool:
"""Whether the transport is currently connected."""
return False
1 change: 1 addition & 0 deletions src/mktl/transport/rabbitmq/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Loading
Loading