From 1a0bf6fffadd563a5103130da7a1feed0c9a9822 Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Sun, 7 Jan 2024 23:06:16 -0800 Subject: [PATCH 01/12] Adds support for using Redis as the datastore in improv --- .github/workflows/CI.yaml | 27 +- demos/minimal/actors/sample_generator.py | 20 +- demos/minimal/actors/sample_processor.py | 14 +- demos/minimal/minimal.yaml | 5 +- demos/minimal/minimal_plasma.yaml | 13 + demos/sample_actors/zmqActor.py | 4 +- docs/installation.md | 18 ++ improv/actor.py | 10 +- improv/cli.py | 22 +- improv/config.py | 47 ++++ improv/nexus.py | 200 ++++++++++++-- improv/store.py | 149 +++++++++- pyproject.toml | 5 +- test/configs/good_config_plasma.yaml | 15 + .../minimal_with_custom_dbfilename.yaml | 14 + .../minimal_with_ephemeral_dbfilename.yaml | 14 + .../minimal_with_fixed_redis_port.yaml | 14 + test/configs/minimal_with_redis_saving.yaml | 14 + test/configs/single_actor_plasma.yaml | 10 + test/conftest.py | 51 +++- test/test_actor.py | 84 ++++-- test/test_cli.py | 6 +- test/test_config.py | 2 +- test/test_demos.py | 18 +- test/test_link.py | 57 +--- test/test_nexus.py | 204 +++++++++++++- test/test_store_with_errors.py | 257 +++++------------- test/test_tui.py | 6 +- 28 files changed, 972 insertions(+), 328 deletions(-) create mode 100644 demos/minimal/minimal_plasma.yaml create mode 100644 test/configs/good_config_plasma.yaml create mode 100644 test/configs/minimal_with_custom_dbfilename.yaml create mode 100644 test/configs/minimal_with_ephemeral_dbfilename.yaml create mode 100644 test/configs/minimal_with_fixed_redis_port.yaml create mode 100644 test/configs/minimal_with_redis_saving.yaml create mode 100644 test/configs/single_actor_plasma.yaml diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 4cef3b4d..0c80283d 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -5,25 +5,48 @@ on: branches: - main - dev + - redis push: branches: - main - dev + - redis jobs: test: runs-on: ${{matrix.os}} strategy: + max-parallel: 1 fail-fast: false matrix: - python-version: ["3.7", "3.8", "3.9", "3.10"] + python-version: ["3.8", "3.9", "3.10"] os: [ubuntu-latest, macos-latest] # [ubuntu-latest, macos-latest, windows-latest] steps: - uses: actions/checkout@v4 with: fetch-depth: 0 + - name: Update Ubuntu and Install Dependencies + if: startsWith(matrix.os, 'ubuntu') + run: | + sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y lsb-release curl gpg + - name: Fetch Redis gpg (Ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: | + curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg + - name: Configure Redis gpg (Ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: | + echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list + - name: Install Redis (Ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: | + sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y redis && sleep 5 && sudo systemctl stop redis-server + - name: Install Redis (macOS) + if: startsWith(matrix.os, 'macos') + run: | + brew install redis - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: @@ -81,4 +104,4 @@ jobs: - name: Close parallel build uses: coverallsapp/github-action@v1 with: - parallel-finished: true \ No newline at end of file + parallel-finished: true diff --git a/demos/minimal/actors/sample_generator.py b/demos/minimal/actors/sample_generator.py index df5191ec..31335833 100644 --- a/demos/minimal/actors/sample_generator.py +++ b/demos/minimal/actors/sample_generator.py @@ -1,5 +1,4 @@ -from improv.actor import Actor, RunManager -from datetime import date # used for saving +from improv.actor import Actor import numpy as np import logging @@ -50,13 +49,20 @@ def runStep(self): """ if self.frame_num < np.shape(self.data)[0]: - data_id = self.client.put( - self.data[self.frame_num], str(f"Gen_raw: {self.frame_num}") - ) + if self.store_loc: + data_id = self.client.put( + self.data[self.frame_num], str(f"Gen_raw: {self.frame_num}") + ) + else: + data_id = self.client.put(self.data[self.frame_num]) # logger.info('Put data in store') try: - self.q_out.put([[data_id, str(self.frame_num)]]) - logger.info("Sent message on") + if self.store_loc: + self.q_out.put([[data_id, str(self.frame_num)]]) + else: + self.q_out.put(data_id) + # logger.info("Sent message on") + self.frame_num += 1 except Exception as e: logger.error( diff --git a/demos/minimal/actors/sample_processor.py b/demos/minimal/actors/sample_processor.py index 24f87880..77c88e08 100644 --- a/demos/minimal/actors/sample_processor.py +++ b/demos/minimal/actors/sample_processor.py @@ -47,17 +47,21 @@ def runStep(self): try: frame = self.q_in.get(timeout=0.001) - except: + except Exception: logger.error("Could not get frame!") pass if frame is not None and self.frame_num is not None: self.done = False - self.frame = self.client.getID(frame[0][0]) + if self.store_loc: + self.frame = self.client.getID(frame[0][0]) + else: + self.frame = self.client.get(frame) avg = np.mean(self.frame[0]) - # print(f"Average: {avg}") + # logger.info(f"Average: {avg}") self.avg_list.append(avg) - # print(f"Overall Average: {np.mean(self.avg_list)}") - # print(f"Frame number: {self.frame_num}") + # logger.info(f"Overall Average: {np.mean(self.avg_list)}") + # logger.info(f"Frame number: {self.frame_num}") + self.frame_num += 1 diff --git a/demos/minimal/minimal.yaml b/demos/minimal/minimal.yaml index 230282d1..c8e0b24e 100644 --- a/demos/minimal/minimal.yaml +++ b/demos/minimal/minimal.yaml @@ -8,4 +8,7 @@ actors: class: Processor connections: - Generator.q_out: [Processor.q_in] \ No newline at end of file + Generator.q_out: [Processor.q_in] + +redis_config: + port: 6379 \ No newline at end of file diff --git a/demos/minimal/minimal_plasma.yaml b/demos/minimal/minimal_plasma.yaml new file mode 100644 index 00000000..ab869678 --- /dev/null +++ b/demos/minimal/minimal_plasma.yaml @@ -0,0 +1,13 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + +plasma_config: \ No newline at end of file diff --git a/demos/sample_actors/zmqActor.py b/demos/sample_actors/zmqActor.py index f942717d..526ed243 100644 --- a/demos/sample_actors/zmqActor.py +++ b/demos/sample_actors/zmqActor.py @@ -157,7 +157,7 @@ def get(self, reply=None): logger.debug(f"getting message using reply {reply} with pub/sub") return self.replyMsg(reply) - def setSendSocket(self, timeout=0.001): + def setSendSocket(self, timeout=1.001): """ Sets up the send socket for the actor. """ @@ -165,7 +165,7 @@ def setSendSocket(self, timeout=0.001): self.send_socket.bind(self.address) time.sleep(timeout) - def setRecvSocket(self, timeout=0.001): + def setRecvSocket(self, timeout=1.001): """ Sets up the receive socket for the actor. """ diff --git a/docs/installation.md b/docs/installation.md index 0d388295..6fa470dd 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -14,6 +14,24 @@ pip install improv --no-binary pyzmq to build `pyzmq` from source. ```` +## Required dependencies + +### Redis + +_improv_ uses Redis, an in-memory datastore, to hold data to be communicated between actors. _improv_ has been tested with Redis server version 7.2.4. Please refer to the instructions below for your operating system: + +#### macOS +A compatible version of Redis can be installed via Homebrew: +``` +brew install redis +``` + +#### Linux +A compatible version of Redis can be installed for most standard Linux distributions by following Redis' instructions on their [installation guide](https://redis.io/docs/latest/operate/oss_and_stack/install/install-redis/install-redis-on-linux/). + +#### Windows (WSL2) +Redis can also be installed on Windows in WSL2. The [installation guide](https://redis.io/docs/latest/operate/oss_and_stack/install/install-redis/install-redis-on-windows/) details the process for both the Windows and Linux portions of WSL2. + ## Optional dependencies In addition to the basic _improv_ installation, users who want to, e.g., run tests locally and build docs should do ``` diff --git a/improv/actor.py b/improv/actor.py index 10d0baf7..e352b354 100644 --- a/improv/actor.py +++ b/improv/actor.py @@ -3,6 +3,8 @@ import asyncio import traceback from queue import Empty + +import improv.store from improv.store import StoreInterface import logging @@ -18,7 +20,7 @@ class AbstractActor: Also needs to be responsive to sent Signals (e.g. run, setup, etc) """ - def __init__(self, name, store_loc, method="fork"): + def __init__(self, name, store_loc=None, method="fork"): """Require a name for multiple instances of the same actor/class Create initial empty dict of Links for easier referencing """ @@ -55,7 +57,11 @@ def setStoreInterface(self, client): def _getStoreInterface(self): # TODO: Where do we require this be run? Add a Signal and include in RM? if not self.client: - store = StoreInterface(self.name, self.store_loc) + store = None + if StoreInterface == improv.store.RedisStoreInterface: + store = StoreInterface(self.name) + else: + store = StoreInterface(self.name, self.store_loc) self.setStoreInterface(store) def setLinks(self, links): diff --git a/improv/cli.py b/improv/cli.py index 75c00aac..720d4e2f 100644 --- a/improv/cli.py +++ b/improv/cli.py @@ -2,6 +2,7 @@ import os.path import re import argparse +import signal import subprocess import sys import psutil @@ -253,12 +254,12 @@ def run_server(args): def run_list(args, printit=True): out_list = [] - pattern = re.compile(r"(improv (run|client|server)|plasma_store)") - mp_pattern = re.compile(r"-c from multiprocessing") + pattern = re.compile(r"(improv (run|client|server)|plasma_store|redis-server)") + # mp_pattern = re.compile(r"-c from multiprocessing") # TODO is this right? for proc in psutil.process_iter(["pid", "name", "cmdline"]): if proc.info["cmdline"]: cmdline = " ".join(proc.info["cmdline"]) - if re.search(pattern, cmdline) or re.search(mp_pattern, cmdline): + if re.search(pattern, cmdline): # or re.search(mp_pattern, cmdline): out_list.append(proc) if printit: print(f"{proc.pid} {proc.name()} {cmdline}") @@ -280,14 +281,23 @@ def run_cleanup(args, headless=False): if res.lower() == "y": for proc in proc_list: - if not proc.status == "terminated": + if not proc.status() == psutil.STATUS_STOPPED: + logging.info( + f"process {proc.pid} {proc.name()}" + f" has status {proc.status()}. Interrupting." + ) try: - proc.terminate() + proc.send_signal(signal.SIGINT) except psutil.NoSuchProcess: pass gone, alive = psutil.wait_procs(proc_list, timeout=3) for p in alive: - p.kill() + p.send_signal(signal.SIGINT) + try: + p.wait(timeout=10) + except psutil.TimeoutExpired as e: + logging.warning(f"{e}: Process did not exit on time.") + else: if not headless: print("No running processes found.") diff --git a/improv/config.py b/improv/config.py index 00f52382..13c7517b 100644 --- a/improv/config.py +++ b/improv/config.py @@ -108,6 +108,10 @@ def createConfig(self): raise RepeatedConnectionsError(name) self.connections.update({name: conn}) + + if "datastore" in cfg.keys(): + self.datastore = cfg["datastore"] + return 0 def addParams(self, type, param): @@ -125,6 +129,49 @@ def saveActors(self): for a in self.actors.values(): wflag = a.saveConfigModules(pathName, wflag) + def use_plasma(self): + return "plasma_config" in self.config.keys() + + def get_redis_port(self): + if self.redis_port_specified(): + return self.config["redis_config"]["port"] + else: + return Config.get_default_redis_port() + + def redis_port_specified(self): + if "redis_config" in self.config.keys(): + return "port" in self.config["redis_config"] + return False + + def redis_saving_enabled(self): + if "redis_config" in self.config.keys(): + return ( + "enable_saving" in self.config["redis_config"] + and self.config["redis_config"]["enable_saving"] + ) + return None + + def generate_ephemeral_db_filename(self): + if "redis_config" in self.config.keys(): + return ( + "generate_ephemeral_db_filename" in self.config["redis_config"] + and self.config["redis_config"]["generate_ephemeral_db_filename"] + ) + return False + + def get_redis_db_filename(self): + if "redis_config" in self.config.keys(): + return ( + self.config["redis_config"]["db_filename"] + if "db_filename" in self.config["redis_config"] + else None + ) + return None + + @staticmethod + def get_default_redis_port(): + return "6379" + class ConfigModule: def __init__(self, name, packagename, classname, options=None): diff --git a/improv/nexus.py b/improv/nexus.py index a750c344..30f382e8 100644 --- a/improv/nexus.py +++ b/improv/nexus.py @@ -1,4 +1,5 @@ import os +import time import uuid import signal import logging @@ -14,7 +15,7 @@ import zmq.asyncio as zmq from zmq import PUB, REP, SocketOption -from improv.store import StoreInterface +from improv.store import StoreInterface, RedisStoreInterface, PlasmaStoreInterface from improv.actor import Signal from improv.config import Config from improv.link import Link, MultiLink @@ -29,7 +30,11 @@ class Nexus: """Main server class for handling objects in improv""" def __init__(self, name="Server"): + self.store = None + self.config = None self.name = name + self.redis_dumpfile = None + self.redis_saving_enabled = False def __str__(self): return self.name @@ -89,6 +94,7 @@ def createNexus( # set up socket in lieu of printing to stdout self.zmq_context = zmq.Context() + self.zmq_context.setsockopt(SocketOption.LINGER, 1) self.out_socket = self.zmq_context.socket(PUB) self.out_socket.bind("tcp://*:%s" % cfg["output_port"]) out_port_string = self.out_socket.getsockopt_string(SocketOption.LAST_ENDPOINT) @@ -99,13 +105,25 @@ def createNexus( in_port_string = self.in_socket.getsockopt_string(SocketOption.LAST_ENDPOINT) cfg["control_port"] = int(in_port_string.split(":")[-1]) + self.configure_redis_persistence() + # default size should be system-dependent - self._startStoreInterface(store_size) + if self.config and self.config.use_plasma(): + self._startStoreInterface(store_size) + else: + self._startStoreInterface(store_size) + logger.info("Redis server started") + self.out_socket.send_string("StoreInterface started") # connect to store and subscribe to notifications logger.info("Create new store object") - self.store = StoreInterface(store_loc=self.store_loc) + if self.config and self.config.use_plasma(): + self.store = PlasmaStoreInterface(store_loc=self.store_loc) + else: + self.store = StoreInterface(server_port_num=self.store_port) + logger.info(f"Redis server connected on port {self.store_port}") + self.store.subscribe() # LMDB storage @@ -227,6 +245,44 @@ def initConfig(self): watchin.append(watch_link) self.createWatcher(watchin) + def configure_redis_persistence(self): + # invalid configs: specifying filename and using an ephemeral filename, + # specifying that saving is off but providing either filename option + db_filename = self.config.get_redis_db_filename() + generate_unique_filename = self.config.generate_ephemeral_db_filename() + redis_saving_enabled = self.config.redis_saving_enabled() + + if db_filename and generate_unique_filename: + logger.error( + "Cannot both generate a unique filename and use the one provided." + ) + raise Exception("Cannot use unique filename and use the one provided.") + + if db_filename or generate_unique_filename: + redis_saving_enabled = True + + self.redis_saving_enabled = redis_saving_enabled + + if not redis_saving_enabled and (db_filename or generate_unique_filename): + logger.error( + "Invalid configuration. Cannot save to a file with saving disabled." + ) + raise Exception("Cannot save to a file with saving disabled.") + + if db_filename: + self.redis_dumpfile = db_filename + elif generate_unique_filename: + self.redis_dumpfile = str(uuid.uuid1()) + ".rdb" + + if self.redis_saving_enabled and self.redis_dumpfile is not None: + logger.info("Redis saving enabled. Saving to file " + self.redis_dumpfile) + elif self.redis_saving_enabled: + logger.info("Redis saving enabled with default dumpfile.") + else: + logger.info("Redis saving disabled.") + + return + def startNexus(self): """ Puts all actors in separate processes and begins polling @@ -299,6 +355,13 @@ def destroyNexus(self): ) logger.warning("Delete the store at location {0}".format(self.store_loc)) + if hasattr(self, "out_socket"): + self.out_socket.close(linger=0) + if hasattr(self, "in_socket"): + self.in_socket.close(linger=0) + if hasattr(self, "zmq_context"): + self.zmq_context.destroy(linger=0) + async def pollQueues(self): """ Listens to links and processes their signals. @@ -575,16 +638,21 @@ def stop_polling(self, stop_signal, queues): def createStoreInterface(self, name): """Creates StoreInterface w/ or w/out LMDB functionality based on {self.use_hdd}.""" - if not self.use_hdd: - return StoreInterface(name, self.store_loc) + if self.config.use_plasma(): + if not self.use_hdd: + return PlasmaStoreInterface(name, self.store_loc) + else: + # I don't think this currently works, + # since the constructor doesn't accept these arguments + if name not in self.store_dict: + self.store_dict[name] = PlasmaStoreInterface( + name, self.store_loc, use_hdd=True, lmdb_name=self.lmdb_name + ) + return self.store_dict[name] else: - if name not in self.store_dict: - self.store_dict[name] = StoreInterface( - name, self.store_loc, use_hdd=True, lmdb_name=self.lmdb_name - ) - return self.store_dict[name] + return RedisStoreInterface(server_port_num=self.store_port) - def _startStoreInterface(self, size): + def _startStoreInterface(self, size, attempts=20): """Start a subprocess that runs the plasma store Raises a RuntimeError exception size is undefined Raises an Exception if the plasma store doesn't start @@ -596,12 +664,14 @@ def _startStoreInterface(self, size): Raises: RuntimeError: if the size is undefined - Exception: if the plasma store doesn't start + Exception: if the store doesn't start """ if size is None: raise RuntimeError("Server size needs to be specified") - try: + self.use_plasma = False + if self.config and self.config.use_plasma(): + self.use_plasma = True self.store_loc = str(os.path.join("/tmp/", str(uuid.uuid4()))) self.p_StoreInterface = subprocess.Popen( [ @@ -617,19 +687,102 @@ def _startStoreInterface(self, size): stderr=subprocess.DEVNULL, ) logger.info("StoreInterface start successful: {}".format(self.store_loc)) - except Exception as e: - logger.exception("StoreInterface cannot be started: {}".format(e)) + else: + logger.info("Setting up Redis store.") + self.store_port = ( + self.config.get_redis_port() + if self.config and self.config.redis_port_specified() + else Config.get_default_redis_port() + ) + if self.config and self.config.redis_port_specified(): + logger.info( + "Attempting to connect to Redis on port {}".format(self.store_port) + ) + # try with failure, incrementing port number + self.p_StoreInterface = self.start_redis(size) + time.sleep(3) + if self.p_StoreInterface.poll(): + logger.error("Could not start Redis on specified port number.") + raise Exception("Could not start Redis on specified port.") + else: + logger.info("Redis port not specified. Searching for open port.") + for attempt in range(attempts): + logger.info( + "Attempting to connect to Redis on port {}".format( + self.store_port + ) + ) + # try with failure, incrementing port number + self.p_StoreInterface = self.start_redis(size) + time.sleep(3) + if self.p_StoreInterface.poll(): # Redis could not start + logger.info( + "Could not connect to port {}".format(self.store_port) + ) + self.store_port = str(int(self.store_port) + 1) + else: + break + else: + logger.error("Could not start Redis on any tried port.") + raise Exception("Could not start Redis on any tried ports.") + + logger.info(f"StoreInterface start successful on port {self.store_port}") + + def start_redis(self, size): + subprocess_command = [ + "redis-server", + "--port", + str(self.store_port), + "--maxmemory", + str(size), + ] + + if self.redis_dumpfile is not None and len(self.redis_dumpfile) == 0: + raise Exception("Save file specified but no filename given.") + + if ( + not self.redis_saving_enabled + ): # the default behavior - do not persist db state. + subprocess_command += ["--save", '""'] + logger.info("Redis dump file disabled.") + elif ( + self.redis_dumpfile is not None + ): # use specified (possibly pre-existing) file + # subprocess_command += ["--save", "1 1"] + subprocess_command += ["--dbfilename", self.redis_dumpfile] + logger.info("Redis dump file set to {}".format(self.redis_dumpfile)) + else: # just use the (possibly preexisting) default dump.rdb file + # subprocess_command += ["--save", "1 1"] + logger.info("Proceeding with using default Redis dump file.") + + logger.info( + "Starting Redis server with command: \n {}".format(subprocess_command) + ) + + return subprocess.Popen( + subprocess_command, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) def _closeStoreInterface(self): """Internal method to kill the subprocess running the store (plasma sever) """ - try: - self.p_StoreInterface.kill() - self.p_StoreInterface.wait() - logger.info("StoreInterface close successful: {}".format(self.store_loc)) - except Exception as e: - logger.exception("Cannot close store {}".format(e)) + if hasattr(self, "p_StoreInterface"): + try: + self.p_StoreInterface.send_signal(signal.SIGINT) + self.p_StoreInterface.wait() + logger.info( + "StoreInterface close successful: {}".format( + self.store_loc + if self.config and self.config.use_plasma() + else self.store_port + ) + ) + + except Exception as e: + logger.exception("Cannot close store {}".format(e)) def createActor(self, name, actor): """Function to instantiate actor, add signal and comm Links, @@ -642,7 +795,10 @@ def createActor(self, name, actor): # Instantiate selected class mod = import_module(actor.packagename) clss = getattr(mod, actor.classname) - instance = clss(actor.name, self.store_loc, **actor.options) + if self.config.use_plasma(): + instance = clss(actor.name, self.store_loc, **actor.options) + else: + instance = clss(actor.name, **actor.options) if "method" in actor.options.keys(): # check for spawn diff --git a/improv/store.py b/improv/store.py index d6c4b62c..614fcf28 100644 --- a/improv/store.py +++ b/improv/store.py @@ -1,3 +1,6 @@ +import os +import uuid + import lmdb import time import pickle @@ -8,6 +11,11 @@ import numpy as np import pyarrow.plasma as plasma +from redis import Redis +from redis.retry import Retry +from redis.backoff import ConstantBackoff +from redis.exceptions import BusyLoadingError, ConnectionError, TimeoutError + from queue import Queue from pathlib import Path from random import random @@ -18,6 +26,8 @@ from pyarrow.lib import ArrowIOError from pyarrow._plasma import PlasmaObjectExists, ObjectNotAvailable, ObjectID +REDIS_GLOBAL_TOPIC = "global_topic" + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -41,6 +51,143 @@ def subscribe(self): raise NotImplementedError +class RedisStoreInterface(StoreInterface): + def __init__(self, name="default", server_port_num=6379, hostname="localhost"): + self.name = name + self.server_port_num = server_port_num + self.hostname = hostname + self.client = self.connect_to_server() + + def connect_to_server(self): + # TODO this should scan for available ports, but only if configured to do so. + # This happens when the config doesn't have Redis settings, + # so we need to communicate this somehow to the StoreInterface here. + """Connect to the store at store_loc, max 20 retries to connect + Raises exception if can't connect + Returns the Redis client if successful + + Args: + server_port_num: the port number where the Redis server + is running on localhost. + """ + try: + retry = Retry(ConstantBackoff(0.25), 5) + self.client = Redis( + host=self.hostname, + port=self.server_port_num, + retry=retry, + retry_on_timeout=True, + retry_on_error=[ + BusyLoadingError, + ConnectionError, + TimeoutError, + ConnectionRefusedError, + ], + ) + self.client.ping() + logger.info( + "Successfully connected to redis datastore on port {} ".format( + self.server_port_num + ) + ) + except Exception: + logger.exception( + "Cannot connect to redis datastore on port {}".format( + self.server_port_num + ) + ) + raise CannotConnectToStoreInterfaceError(self.server_port_num) + + return self.client + + def put(self, object): + """ + Put a single object referenced by its string name + into the store. If the store already has a value stored at this key, + the value will not be overwritten. + + Unknown error + + Args: + object: the object to store in Redis + object_key (str): the key under which the object should be stored + + Returns: + object: the object that was a + """ + object_key = str(os.getpid()) + str(uuid.uuid4()) + try: + # buffers would theoretically go here if we need to force out-of-band + # serialization for single objects + # TODO this will actually just silently fail if we use an existing + # TODO key; not sure it's worth the network overhead to check every + # TODO key twice every time. we still need a better solution for + # TODO this, but it will work now singlethreaded most of the time. + + self.client.set(object_key, pickle.dumps(object, protocol=5), nx=True) + except Exception: + logger.error("Could not store object {}".format(object_key)) + logger.error(traceback.format_exc()) + + return object_key + + def get(self, object_key): + """ + Get object by specified key + + Args: + object_name: the key of the object + + Returns: + Stored object + + Raises: + ObjectNotFoundError: If the key is not found + """ + object_value = self.client.get(object_key) + if object_value: + # buffers would also go here to force out-of-band deserialization + return pickle.loads(object_value) + + logger.warning("Object {} cannot be found.".format(object_key)) + raise ObjectNotFoundError + + def subscribe(self, topic=REDIS_GLOBAL_TOPIC): + p = self.client.pubsub() + p.subscribe(topic) + + def get_list(self, ids): + """Get multiple objects from the store + + Args: + ids (list): of type str + + Returns: + list of the objects + """ + return self.client.mget(ids) + + def get_all(self): + """Get a listing of all objects in the store. + Note that this may be very performance-intensive in large databases. + + Returns: + list of all the objects in the store + """ + all_keys = self.client.keys() # defaults to "*" pattern, so will fetch all + return self.client.mget(all_keys) + + def reset(self): + """Reset client connection""" + self.client = self.connect_to_server() + logger.debug( + "Reset local connection to store on port: {0}".format(self.server_port_num) + ) + + def notify(self): + pass # I don't see any call sites for this, so leaving it blank at the moment + + class PlasmaStoreInterface(StoreInterface): """Basic interface for our specific data store implemented with apache arrow plasma Objects are stored with object_ids @@ -517,7 +664,7 @@ def subscribe(self): # Aliasing -StoreInterface = PlasmaStoreInterface +StoreInterface = RedisStoreInterface @dataclass diff --git a/pyproject.toml b/pyproject.toml index 2cc99c28..bec4b3e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ readme = "README.md" requires-python = ">=3.6" keywords = ["neuroscience", "adaptive", "closed loop"] dependencies = [ - "numpy", + "numpy<=1.26", "scipy", "matplotlib", "pyarrow==9.0.0", @@ -24,6 +24,7 @@ dependencies = [ "pyzmq", "psutil", "h5py", + "redis" ] classifiers = ['Development Status :: 3 - Alpha', 'Intended Audience :: Science/Research', @@ -112,4 +113,4 @@ distance-dirty = "{next_version}.dev{distance}+{vcs}{rev}.d{build_date:%Y%m%d}" # Example formatted version: 1.2.4.dev42+ge174a1f.d20230922 [tool.versioningit.write] -file = "improv/_version.py" \ No newline at end of file +file = "improv/_version.py" diff --git a/test/configs/good_config_plasma.yaml b/test/configs/good_config_plasma.yaml new file mode 100644 index 00000000..8323017b --- /dev/null +++ b/test/configs/good_config_plasma.yaml @@ -0,0 +1,15 @@ +actors: + Acquirer: + package: demos.sample_actors.acquire + class: FileAcquirer + filename: data/Tolias_mesoscope_2.hdf5 + framerate: 30 + + Analysis: + package: demos.sample_actors.simple_analysis + class: SimpleAnalysis + +connections: + Acquirer.q_out: [Analysis.q_in] + +plasma_config: \ No newline at end of file diff --git a/test/configs/minimal_with_custom_dbfilename.yaml b/test/configs/minimal_with_custom_dbfilename.yaml new file mode 100644 index 00000000..eff3b34c --- /dev/null +++ b/test/configs/minimal_with_custom_dbfilename.yaml @@ -0,0 +1,14 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + +redis_config: + db_filename: custom_dbfilename.rdb \ No newline at end of file diff --git a/test/configs/minimal_with_ephemeral_dbfilename.yaml b/test/configs/minimal_with_ephemeral_dbfilename.yaml new file mode 100644 index 00000000..9f97372d --- /dev/null +++ b/test/configs/minimal_with_ephemeral_dbfilename.yaml @@ -0,0 +1,14 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + +redis_config: + generate_ephemeral_db_filename: True \ No newline at end of file diff --git a/test/configs/minimal_with_fixed_redis_port.yaml b/test/configs/minimal_with_fixed_redis_port.yaml new file mode 100644 index 00000000..c8e0b24e --- /dev/null +++ b/test/configs/minimal_with_fixed_redis_port.yaml @@ -0,0 +1,14 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + +redis_config: + port: 6379 \ No newline at end of file diff --git a/test/configs/minimal_with_redis_saving.yaml b/test/configs/minimal_with_redis_saving.yaml new file mode 100644 index 00000000..15e95b6c --- /dev/null +++ b/test/configs/minimal_with_redis_saving.yaml @@ -0,0 +1,14 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + +redis_config: + enable_saving: True \ No newline at end of file diff --git a/test/configs/single_actor_plasma.yaml b/test/configs/single_actor_plasma.yaml new file mode 100644 index 00000000..812480b1 --- /dev/null +++ b/test/configs/single_actor_plasma.yaml @@ -0,0 +1,10 @@ +actors: + Acquirer: + package: demos.sample_actors.acquire + class: FileAcquirer + filename: data/Tolias_mesoscope_2.hdf5 + framerate: 15 + +connections: +# settings: +# use_watcher: [Acquirer, Processor, Visual, Analysis] diff --git a/test/conftest.py b/test/conftest.py index 5bbdb552..23880328 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,10 +1,59 @@ import os +import signal import uuid import pytest +import subprocess store_loc = str(os.path.join("/tmp/", str(uuid.uuid4()))) +redis_port_num = 6379 +WAIT_TIMEOUT = 120 -@pytest.fixture() +@pytest.fixture def set_store_loc(): return store_loc + + +@pytest.fixture +def server_port_num(): + return redis_port_num + + +@pytest.fixture +# TODO: put in conftest.py +def setup_store(server_port_num): + """Start the server""" + p = subprocess.Popen( + [ + "redis-server", + "--save", + '""', + "--dbfilename", + "cinonexistent.rdb", + "--port", + str(server_port_num), + "--maxmemory", + str(10000000), + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + yield p + + # kill the subprocess when the caller is done with it + p.send_signal(signal.SIGINT) + p.wait(WAIT_TIMEOUT) + + +@pytest.fixture +def setup_plasma_store(set_store_loc, scope="module"): + """Fixture to set up the store subprocess with 10 mb.""" + p = subprocess.Popen( + ["plasma_store", "-s", set_store_loc, "-m", str(10000000)], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + yield p + p.send_signal(signal.SIGINT) + p.wait(WAIT_TIMEOUT) diff --git a/test/test_actor.py b/test/test_actor.py index 0745aa69..448fd34e 100644 --- a/test/test_actor.py +++ b/test/test_actor.py @@ -1,11 +1,9 @@ import os import psutil import pytest -import subprocess from improv.link import Link # , AsyncQueue from improv.actor import AbstractActor as Actor -from improv.store import StoreInterface - +from improv.store import StoreInterface, PlasmaStoreInterface # set global_variables @@ -13,20 +11,7 @@ pytest.example_links = {} -@pytest.fixture() -def setup_store(set_store_loc, scope="module"): - """Fixture to set up the store subprocess with 10 mb.""" - p = subprocess.Popen( - ["plasma_store", "-s", set_store_loc, "-m", str(10000000)], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - yield p - p.kill() - p.wait() - - -@pytest.fixture() +@pytest.fixture def init_actor(set_store_loc): """Fixture to initialize and teardown an instance of actor.""" @@ -35,7 +20,7 @@ def init_actor(set_store_loc): act = None -@pytest.fixture() +@pytest.fixture def example_string_links(): """Fixture to provide a commonly used test input.""" @@ -43,10 +28,27 @@ def example_string_links(): return pytest.example_string_links -@pytest.fixture() -def example_links(setup_store, set_store_loc): +@pytest.fixture +def example_links(setup_store, server_port_num): """Fixture to provide link objects as test input and setup store.""" - StoreInterface(store_loc=set_store_loc) + StoreInterface(server_port_num=server_port_num) + + acts = [ + Actor("act" + str(i), server_port_num) for i in range(1, 5) + ] # range must be even + + links = [ + Link("L" + str(i + 1), acts[i], acts[i + 1]) for i in range(len(acts) // 2) + ] + link_dict = {links[i].name: links[i] for i, l in enumerate(links)} + pytest.example_links = link_dict + return pytest.example_links + + +@pytest.fixture +def example_links_plasma(setup_store, set_store_loc): + """Fixture to provide link objects as test input and setup store.""" + PlasmaStoreInterface(store_loc=set_store_loc) acts = [ Actor("act" + str(i), set_store_loc) for i in range(1, 5) @@ -95,11 +97,20 @@ def test_repr(example_string_links, set_store_loc): assert act.__repr__() == "Test: dict_keys(['1', '2', '3'])" -def test_setStoreInterface(setup_store, set_store_loc): +def test_setStoreInterface(setup_store, server_port_num): + """Tests if the store is started and linked with the actor.""" + + act = Actor("Acquirer", server_port_num) + store = StoreInterface(server_port_num=server_port_num) + act.setStoreInterface(store.client) + assert act.client is store.client + + +def test_plasma_setStoreInterface(setup_plasma_store, set_store_loc): """Tests if the store is started and linked with the actor.""" act = Actor("Acquirer", set_store_loc) - store = StoreInterface(store_loc=set_store_loc) + store = PlasmaStoreInterface(store_loc=set_store_loc) act.setStoreInterface(store.client) assert act.client is store.client @@ -292,7 +303,30 @@ def test_changePriority(init_actor): assert psutil.Process(os.getpid()).nice() == 19 -def test_actor_connection(setup_store, set_store_loc): +def test_actor_connection(setup_store, server_port_num): + """Test if the links between actors are established correctly. + + This test instantiates two actors with different names, then instantiates + a Link object linking the two actors. A string is put to the input queue of + one actor. Then, in the other actor, it is removed from the queue, and + checked to verify it matches the original message. + """ + act1 = Actor("a1", server_port_num) + act2 = Actor("a2", server_port_num) + + StoreInterface(server_port_num=server_port_num) + link = Link("L12", act1, act2) + act1.setLinkIn(link) + act2.setLinkOut(link) + + msg = "message" + + act1.q_in.put(msg) + + assert act2.q_out.get() == msg + + +def test_plasma_actor_connection(setup_plasma_store, set_store_loc): """Test if the links between actors are established correctly. This test instantiates two actors with different names, then instantiates @@ -303,7 +337,7 @@ def test_actor_connection(setup_store, set_store_loc): act1 = Actor("a1", set_store_loc) act2 = Actor("a2", set_store_loc) - StoreInterface(store_loc=set_store_loc) + PlasmaStoreInterface(store_loc=set_store_loc) link = Link("L12", act1, act2) act1.setLinkIn(link) act2.setLinkOut(link) diff --git a/test/test_cli.py b/test/test_cli.py index 7e18363f..c4b34bff 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -13,7 +13,7 @@ SERVER_TIMEOUT = 16 -@pytest.fixture() +@pytest.fixture def setdir(): prev = os.getcwd() os.chdir(os.path.dirname(__file__)) @@ -21,7 +21,7 @@ def setdir(): os.chdir(prev) -@pytest.fixture() +@pytest.fixture async def server(setdir, ports): """ Sets up a server using minimal.yaml in the configs folder. @@ -61,7 +61,7 @@ async def server(setdir, ports): pass -@pytest.fixture() +@pytest.fixture async def cli_args(setdir, ports): logfile = "tmp.log" control_port, output_port, logging_port = ports diff --git a/test/test_config.py b/test/test_config.py index f9d11201..12cc79bf 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -16,7 +16,7 @@ # set global variables -@pytest.fixture() +@pytest.fixture def set_configdir(): """Sets the current working directory to the configs file.""" prev = os.getcwd() diff --git a/test/test_demos.py b/test/test_demos.py index 61098c67..03f45d33 100644 --- a/test/test_demos.py +++ b/test/test_demos.py @@ -1,3 +1,5 @@ +import time + import pytest import os import asyncio @@ -16,7 +18,7 @@ SERVER_WARMUP = 10 -@pytest.fixture() +@pytest.fixture def setdir(): prev = os.getcwd() os.chdir(os.path.dirname(__file__)) @@ -25,7 +27,7 @@ def setdir(): os.chdir(prev) -@pytest.fixture() +@pytest.fixture def ip(): """Fixture to provide an IP test input.""" @@ -34,7 +36,11 @@ def ip(): @pytest.mark.parametrize( - ("dir", "configfile", "logfile"), [("minimal", "minimal.yaml", "testlog")] + ("dir", "configfile", "logfile"), + [ + ("minimal", "minimal.yaml", "testlog"), + ("minimal", "minimal_plasma.yaml", "testlog"), + ], ) async def test_simple_boot_and_quit(dir, configfile, logfile, setdir, ports): os.chdir(dir) @@ -58,6 +64,8 @@ async def test_simple_boot_and_quit(dir, configfile, logfile, setdir, ports): with open(logfile, mode="a+") as log: server = subprocess.Popen(server_opts, stdout=log, stderr=log) + time.sleep(5) + print(log.readlines()) await asyncio.sleep(SERVER_WARMUP) # initialize client @@ -136,8 +144,8 @@ def test_zmq_ps(ip, unused_tcp_port): """Tests if we can set the zmq PUB/SUB socket and send message.""" port = unused_tcp_port LOGGER.info("beginning test") - act1 = ZmqActor("act1", "/tmp/store", type="PUB", ip=ip, port=port) - act2 = ZmqActor("act2", "/tmp/store", type="SUB", ip=ip, port=port) + act1 = ZmqActor("act1", type="PUB", ip=ip, port=port) + act2 = ZmqActor("act2", type="SUB", ip=ip, port=port) LOGGER.info("ZMQ Actors constructed") # Note these sockets must be set up for testing # this is not needed for running in improv diff --git a/test/test_link.py b/test/test_link.py index 41ffbcfb..eec9a521 100644 --- a/test/test_link.py +++ b/test/test_link.py @@ -7,36 +7,9 @@ from improv.actor import Actor -from improv.store import StoreInterface from improv.link import Link -@pytest.fixture() -def setup_store(): - """Fixture to set up the store subprocess with 10 mb. - - This fixture runs a subprocess that instantiates the store with a - memory of 10 megabytes. It specifies that "/tmp/store/" is the - location of the store socket. - - Yields: - store: An instance of the store. - - TODO: - Figure out the scope. - """ - - p = subprocess.Popen( - ["plasma_store", "-s", "/tmp/store", "-m", str(10000000)], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - store = StoreInterface(store_loc="/tmp/store") - yield store - p.kill() - p.wait() - - def init_actors(n=1): """Function to return n unique actors. @@ -50,17 +23,16 @@ def init_actors(n=1): return [Actor("test " + str(i), "/tmp/store", links={}) for i in range(n)] -@pytest.fixture() +@pytest.fixture def example_link(setup_store): """Fixture to provide a commonly used Link object.""" - setup_store act = init_actors(2) lnk = Link("Example", act[0].name, act[1].name) yield lnk lnk = None -@pytest.fixture() +@pytest.fixture def example_actor_system(setup_store): """Fixture to provide a list of 4 connected actors.""" @@ -88,7 +60,7 @@ def example_actor_system(setup_store): acts = None -@pytest.fixture() +@pytest.fixture def _kill_pytest_processes(): """Kills all processes with "pytest" in their name. @@ -242,7 +214,7 @@ def test_put_nowait(example_link): assert t_net < 0.005 # 5 ms -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_put_async_success(example_link): """Tests if put_async returns None. @@ -256,7 +228,7 @@ async def test_put_async_success(example_link): assert res is None -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_put_async_multiple(example_link): """Tests if async putting multiple objects preserves their order.""" @@ -273,7 +245,7 @@ async def test_put_async_multiple(example_link): assert messages_out == messages -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_put_and_get_async(example_link): """Tests if async get preserves order after async put.""" @@ -290,15 +262,17 @@ async def test_put_and_get_async(example_link): assert messages_out == messages -def test_put_overflow(setup_store, caplog): +@pytest.mark.skip( + reason="This test needs additional work to cause an overflow in the datastore." +) +def test_put_overflow(setup_store, server_port_num, caplog): """Tests if putting too large of an object raises an error.""" p = subprocess.Popen( - ["plasma_store", "-s", "/tmp/store", "-m", str(1000)], + ["redis-server", "--port", str(server_port_num), "--maxmemory", str(1000)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) - StoreInterface(store_loc="/tmp/store") acts = init_actors(2) lnk = Link("L1", acts[0], acts[1]) @@ -309,7 +283,6 @@ def test_put_overflow(setup_store, caplog): p.kill() p.wait() - setup_store # restore the 10 mb store if caplog.records: for record in caplog.records: @@ -397,7 +370,7 @@ def test_get_nowait_empty(example_link): pytest.fail("the queue is not empty") -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_get_async_success(example_link): """Tests if async_get gets the correct element from the queue.""" @@ -408,7 +381,7 @@ async def test_get_async_success(example_link): assert res == "message" -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_get_async_empty(example_link): """Tests if get_async times out given an empty queue. @@ -442,7 +415,7 @@ def test_cancel_join_thread(example_link): @pytest.mark.skip(reason="unfinished") -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_join_thread(example_link): """Tests join_thread. This test is unfinished @@ -456,7 +429,7 @@ async def test_join_thread(example_link): assert True -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_multi_actor_system(example_actor_system, setup_store): """Tests if async puts/gets with many actors have good messages.""" diff --git a/test/test_nexus.py b/test/test_nexus.py index 84f3759f..fc8dd458 100644 --- a/test/test_nexus.py +++ b/test/test_nexus.py @@ -1,3 +1,4 @@ +import glob import time import os import pytest @@ -16,7 +17,7 @@ SERVER_COUNTER = 0 -@pytest.fixture() +@pytest.fixture def ports(): global SERVER_COUNTER CONTROL_PORT = 5555 @@ -30,7 +31,7 @@ def ports(): SERVER_COUNTER += 3 -@pytest.fixture() +@pytest.fixture def setdir(): prev = os.getcwd() os.chdir(os.path.dirname(__file__) + "/configs") @@ -38,12 +39,12 @@ def setdir(): os.chdir(prev) -@pytest.fixture() +@pytest.fixture def sample_nex(setdir, ports): nex = Nexus("test") nex.createNexus( file="good_config.yaml", - store_size=4000, + store_size=40000000, control_port=ports[0], output_port=ports[1], ) @@ -80,11 +81,16 @@ def test_init(setdir): assert str(nex) == "test" -def test_createNexus(setdir, ports): +@pytest.mark.parametrize( + "cfg_name", + [ + "good_config.yaml", + "good_config_plasma.yaml", + ], +) +def test_createNexus(setdir, ports, cfg_name): nex = Nexus("test") - nex.createNexus( - file="good_config.yaml", control_port=ports[0], output_port=ports[1] - ) + nex.createNexus(file=cfg_name, control_port=ports[0], output_port=ports[1]) assert list(nex.comm_queues.keys()) == [ "GUI_comm", "Acquirer_comm", @@ -210,7 +216,14 @@ def test_config_construction(cfg_name, actor_list, link_list, setdir, ports): assert True -def test_single_actor(setdir, ports): +@pytest.mark.parametrize( + "cfg_name", + [ + "single_actor.yaml", + "single_actor_plasma.yaml", + ], +) +def test_single_actor(setdir, ports, cfg_name): nex = Nexus("test") with pytest.raises(AttributeError): nex.createNexus( @@ -271,7 +284,7 @@ def test_queue_message(setdir, sample_nex): assert True -@pytest.mark.asyncio() +@pytest.mark.asyncio @pytest.mark.skip(reason="This test is unfinished.") async def test_queue_readin(sample_nex, caplog): nex = sample_nex @@ -315,9 +328,9 @@ def test_usehdd_False(): assert True -def test_startstore(caplog, set_store_loc): +def test_startstore(caplog): nex = Nexus("test") - nex._startStoreInterface(10000) # 10 kb store + nex._startStoreInterface(10000000) # 10 MB store assert any( "StoreInterface start successful" in record.msg for record in caplog.records @@ -346,6 +359,170 @@ def test_closestore(caplog): assert True +def test_specified_free_port(caplog, setdir, ports): + nex = Nexus("test") + nex.createNexus( + file="minimal_with_fixed_redis_port.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + nex.destroyNexus() + + assert any( + "StoreInterface start successful on port 6379" in record.msg + for record in caplog.records + ) + + +def test_specified_busy_port(caplog, setdir, ports, setup_store): + nex = Nexus("test") + with pytest.raises(Exception, match="Could not start Redis on specified port."): + nex.createNexus( + file="minimal_with_fixed_redis_port.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + nex.destroyNexus() + + assert any( + "Could not start Redis on specified port number." in record.msg + for record in caplog.records + ) + + +def test_unspecified_port_default_free(caplog, setdir, ports): + nex = Nexus("test") + nex.createNexus( + file="minimal.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + nex.destroyNexus() + + assert any( + "StoreInterface start successful on port 6379" in record.msg + for record in caplog.records + ) + + +def test_unspecified_port_default_busy(caplog, setdir, ports, setup_store): + nex = Nexus("test") + nex.createNexus( + file="minimal.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + nex.destroyNexus() + assert any( + "StoreInterface start successful on port 6380" in record.msg + for record in caplog.records + ) + + +def test_no_dumpfile_by_default(caplog, setdir, ports): + nex = Nexus("test") + nex.createNexus( + file="minimal.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + nex.destroyNexus() + + assert "dump.rdb" not in os.listdir(".") + + +def test_default_dumpfile_if_none_specified(caplog, setdir, ports, server_port_num): + nex = Nexus("test") + nex.createNexus( + file="minimal_with_redis_saving.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + store = StoreInterface(server_port_num=server_port_num) + store.put(1) + + time.sleep(3) + + nex.destroyNexus() + + logging.info(os.getcwd() + "\n") + logging.info(os.listdir(".")) + + assert "dump.rdb" in os.listdir(".") + + if "dump.rdb" in os.listdir("."): + os.remove("dump.rdb") + else: + logging.info("didn't find dbfilename") + + logging.info("exited test") + + +def test_specify_static_dumpfile(caplog, setdir, ports, server_port_num): + nex = Nexus("test") + nex.createNexus( + file="minimal_with_custom_dbfilename.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + store = StoreInterface(server_port_num=server_port_num) + store.put(1) + + time.sleep(3) + + nex.destroyNexus() + + assert "custom_dbfilename.rdb" in os.listdir(".") + + if "custom_dbfilename.rdb" in os.listdir("."): + os.remove("custom_dbfilename.rdb") + else: + logging.info("didn't find dbfilename") + + logging.info("exited test") + + +def test_use_ephemeral_dbfile(caplog, setdir, ports, server_port_num): + nex = Nexus("test") + nex.createNexus( + file="minimal_with_ephemeral_dbfilename.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + store = StoreInterface(server_port_num=server_port_num) + store.put(1) + + time.sleep(3) + + nex.destroyNexus() + + logging.info(os.getcwd() + "\n") + logging.info(os.listdir(".")) + + assert any([".rdb" in filename for filename in os.listdir(".")]) + + [os.remove(db_filename) for db_filename in glob.glob("*.rdb")] + + logging.info("completed ephemeral db test") + + +@pytest.mark.skip(reason="Nexus no longer deletes files on shutdown. Nothing to test.") def test_store_already_deleted_issues_warning(caplog): nex = Nexus("test") nex._startStoreInterface(10000) @@ -384,6 +561,9 @@ def test_actor_sub(setdir, capsys, monkeypatch, ports): assert True +@pytest.mark.skip( + reason="skipping to prevent issues with orphaned stores. TODO fix this" +) def test_sigint_exits_cleanly(ports, tmp_path): server_opts = [ "improv", diff --git a/test/test_store_with_errors.py b/test/test_store_with_errors.py index d2951398..b95781a0 100644 --- a/test/test_store_with_errors.py +++ b/test/test_store_with_errors.py @@ -1,24 +1,21 @@ import pytest +from pyarrow import plasma -# import time -from improv.store import StoreInterface +from improv.store import StoreInterface, RedisStoreInterface, PlasmaStoreInterface -# from multiprocessing import Process from pyarrow._plasma import PlasmaObjectExists from scipy.sparse import csc_matrix import numpy as np -import pyarrow.plasma as plasma +import redis +import logging -# from pyarrow.lib import ArrowIOError -# from improv.store import ObjectNotFoundError -# from improv.store import CannotGetObjectError from improv.store import CannotConnectToStoreInterfaceError -# import pickle -import subprocess - WAIT_TIMEOUT = 10 +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + # TODO: add docstrings!!! # TODO: clean up syntax - consistent capitalization, function names, etc. @@ -27,42 +24,24 @@ # Separate each class as individual file - individual tests??? -# @pytest.fixture -# def store_loc(): -# store_loc = '/dev/shm' -# return store_loc - -# store_loc = '/dev/shm' +def test_connect(setup_store, server_port_num): + store = StoreInterface(server_port_num=server_port_num) + assert isinstance(store.client, redis.Redis) -@pytest.fixture() -# TODO: put in conftest.py -def setup_store(set_store_loc): - """Start the server""" - print("Setting up Plasma store.") - p = subprocess.Popen( - ["plasma_store", "-s", set_store_loc, "-m", str(10000000)], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - # with plasma.start_plasma_store(10000000) as ps: - - yield p +def test_plasma_connect(setup_plasma_store, set_store_loc): + store = PlasmaStoreInterface(store_loc=set_store_loc) + assert isinstance(store.client, plasma.PlasmaClient) - # ''' Kill the server - # ''' - # print('Tearing down Plasma store.') - p.kill() - p.wait(WAIT_TIMEOUT) +def test_redis_connect(setup_store, server_port_num): + store = RedisStoreInterface(server_port_num=server_port_num) + assert isinstance(store.client, redis.Redis) + assert store.client.ping() -def test_connect(setup_store, set_store_loc): - store = StoreInterface(store_loc=set_store_loc) - assert isinstance(store.client, plasma.PlasmaClient) - -def test_connect_incorrect_path(setup_store, set_store_loc): +def test_connect_incorrect_path(setup_plasma_store, set_store_loc): # TODO: shorter name??? # TODO: passes, but refactor --- see comments store_loc = "asdf" @@ -73,13 +52,20 @@ def test_connect_incorrect_path(setup_store, set_store_loc): # # Check that the exception thrown is a CannotConnectToStoreInterfaceError # raise Exception('Cannot connect to store: {0}'.format(e)) with pytest.raises(CannotConnectToStoreInterfaceError) as e: - store = StoreInterface(store_loc=store_loc) + store = PlasmaStoreInterface(store_loc=store_loc) store.connect_store(store_loc) # Check that the exception thrown is a CannotConnectToStoreInterfaceError assert e.value.message == "Cannot connect to store at {}".format(str(store_loc)) -def test_connect_none_path(setup_store): +def test_redis_connect_wrong_port(setup_store, server_port_num): + bad_port_num = 1234 + with pytest.raises(CannotConnectToStoreInterfaceError) as e: + RedisStoreInterface(server_port_num=bad_port_num) + assert e.value.message == "Cannot connect to store at {}".format(str(bad_port_num)) + + +def test_connect_none_path(setup_plasma_store): # BUT default should be store_loc = '/tmp/store' if not entered? store_loc = None # Handle exception thrown - assert name == 'CannotConnectToStoreInterfaceError' @@ -93,7 +79,7 @@ def test_connect_none_path(setup_store): # Check that the exception thrown is a CannotConnectToStoreInterfaceError # raise Exception('Cannot connect to store: {0}'.format(e)) with pytest.raises(CannotConnectToStoreInterfaceError) as e: - store = StoreInterface(store_loc=store_loc) + store = PlasmaStoreInterface(store_loc=store_loc) store.connect_store(store_loc) # Check that the exception thrown is a CannotConnectToStoreInterfaceError assert e.value.message == "Cannot connect to store at {}".format(str(store_loc)) @@ -105,65 +91,34 @@ def test_connect_none_path(setup_store): # TODO: @pytest.parameterize...store.get and store.getID for diff datatypes, # pickleable and not, etc. # Check raises...CannotGetObjectError (object never stored) -def test_init_empty(setup_store, set_store_loc): - store = StoreInterface(store_loc=set_store_loc) - assert store.get_all() == {} +def test_init_empty(setup_store, server_port_num): + store = StoreInterface(server_port_num=server_port_num) + # logger.info(store.client.config_get()) + assert store.get_all() == [] -# class StoreInterfaceGetID(self): -# TODO: -# Check both hdd_only=False/True -# Check isInstance type, isInstance bytes, else -# Check in disk - pytest.raises(ObjectNotFoundError) -# Decide to test_getList and test_get_all +def test_plasma_init_empty(setup_plasma_store, set_store_loc): + store = PlasmaStoreInterface(store_loc=set_store_loc) + assert store.get_all() == {} -# def test_is_picklable(self): -# Test if obj to put is picklable - if not raise error, handle/suggest how to fix -# TODO: TEST BELOW: -# except PlasmaObjectExists: -# logger.error('Object already exists. Meant to call replace?') -# except ArrowIOError as e: -# logger.error('Could not store object '+ \ -# object_name+': {} {}'.format(type(e).__name__, e)) -# logger.info('Refreshing connection and continuing') -# self.reset() -# except Exception as e: -# logger.error('Could not store object '+ \ -# object_name+': {} {}'.format(type(e).__name__, e)) +def test_is_csc_matrix_and_put(setup_store, server_port_num): + mat = csc_matrix((3, 4), dtype=np.int8) + store = StoreInterface(server_port_num=server_port_num) + x = store.put(mat) + assert isinstance(store.get(x), csc_matrix) -def test_is_csc_matrix_and_put(setup_store, set_store_loc): +def test_plasma_is_csc_matrix_and_put(setup_plasma_store, set_store_loc): mat = csc_matrix((3, 4), dtype=np.int8) - store = StoreInterface(store_loc=set_store_loc) + store = PlasmaStoreInterface(store_loc=set_store_loc) x = store.put(mat, "matrix") assert isinstance(store.getID(x), csc_matrix) -# FAILED - ObjectNotFoundError NOT RAISED? -# def test_not_put(setup_store): -# store_loc = '/tmp/store' -# store = StoreInterface(store_loc) -# with pytest.raises(ObjectNotFoundError) as e: -# obj_id = store.getID(store.random_ObjectID(1)) -# # Check that the exception thrown is a ObjectNotFoundError -# assert e.value.message == 'Cannnot find object with ID/name "{}"'.format(obj_id) - -# FAILED - AssertionError...looks at LMDBStoreInterface in story.py -# assert name is not None? -# def test_use_hdd(setup_store): -# store_loc = '/tmp/store' -# store = StoreInterface(store_loc, use_lmdb=True) -# lmdb_store = store.lmdb_store -# lmdb_store.put(1, 'one') -# assert lmdb_store.getID('one', hdd_only=True) == 1 - -# class StoreInterfaceGetListandAll(StoreInterfaceDependentTestCase): - - -@pytest.mark.skip() -def test_get_list_and_all(setup_store, set_store_loc): - store = StoreInterface(store_loc=set_store_loc) +@pytest.mark.skip +def test_get_list_and_all(setup_store, server_port_num): + store = StoreInterface(server_port_num=server_port_num) # id = store.put(1, "one") # id2 = store.put(2, "two") # id3 = store.put(3, "three") @@ -171,33 +126,34 @@ def test_get_list_and_all(setup_store, set_store_loc): assert [1, 2, 3] == store.get_all() -# class StoreInterface_ReleaseReset(StoreInterfaceDependentTestCase): - -# FAILED - DID NOT RAISE ??? -# def test_release(setup_store): -# store_loc = '/tmp/store' -# store = StoreInterface(store_loc) -# with pytest.raises(ArrowIOError) as e: -# store.release() -# store.put(1, 'one') -# # Check that the exception thrown is an ArrowIOError -# assert e.value.message == 'Could not store object ' + \ -# object_name + ': {} {}'.format(type(e).__name__, e) -# # TODO: assert info == 'Refreshing connection and continuing' +def test_reset(setup_store, server_port_num): + store = StoreInterface(server_port_num=server_port_num) + store.reset() + id = store.put(1) + assert store.get(id) == 1 -def test_reset(setup_store, set_store_loc): - store = StoreInterface(store_loc=set_store_loc) +def test_plasma_reset(setup_plasma_store, set_store_loc): + store = PlasmaStoreInterface(store_loc=set_store_loc) store.reset() id = store.put(1, "one") assert store.get(id) == 1 -# class StoreInterface_Put(StoreInterfaceDependentTestCase): +def test_put_one(setup_store, server_port_num): + store = StoreInterface(server_port_num=server_port_num) + id = store.put(1) + assert 1 == store.get(id) + + +def test_redis_put_one(setup_store, server_port_num): + store = RedisStoreInterface(server_port_num=server_port_num) + key = store.put(1) + assert 1 == store.get(key) -def test_put_one(setup_store, set_store_loc): - store = StoreInterface(store_loc=set_store_loc) +def test_plasma_put_one(setup_plasma_store, set_store_loc): + store = PlasmaStoreInterface(store_loc=set_store_loc) id = store.put(1, "one") assert 1 == store.get(id) @@ -213,84 +169,13 @@ def test_put_twice(setup_store): assert e.value.message == "Object already exists. Meant to call replace?" -# class StoreInterface_PutGet(StoreInterfaceDependentTestCase): - - -def test_getOne(setup_store, set_store_loc): - store = StoreInterface(store_loc=set_store_loc) - id = store.put(1, "one") +def test_getOne(setup_store, server_port_num): + store = StoreInterface(server_port_num=server_port_num) + id = store.put(1) assert 1 == store.get(id) -# def test_get_nonexistent(setup_store): -# store = StoreInterface() -# # Handle exception thrown -# # Check that the exception thrown is a CannotGetObjectError -# with pytest.raises(CannotGetObjectError) as e: -# # Check that the exception thrown is an PlasmaObjectExists -# store.get('three') -# assert e.value.message == 'Cannot get object {}'.format(self.query) - -# TODO: -"""class StoreInterface_Notify(StoreInterfaceDependentTestCase): - - def test_notify(self): - # TODO: not unit testable? - -### This is NOT USED anymore??? -class StoreInterface_UpdateStoreInterfaced(StoreInterfaceDependentTestCase): - - # Accessing self.store.stored directly to test getStoreInterfaced separately - def test_updateGet(self): - self.store.put(1, 'one') - self.store.updateStoreInterfaced('one', 3) - assert 3 == self.store.stored['one'] - -class StoreInterface_GetStoreInterfaced(StoreInterfaceDependentTestCase): - - def test_getStoreInterfacedEmpty(self): - assert self.store.getStoreInterfaced() == False - - def test_putGetStoreInterfaced(self): - self.store.put(1, 'one') - assert 1 == self.store.getID(self.store.getStoreInterfaced()['one']) - -class StoreInterface_internalPutGet(StoreInterfaceDependentTestCase): - - def test_put(self): - id = self.store.random_ObjectID(1) - self.store._put(1, id[0]) - assert 1 == self.store.client.get(id[0]) - - def test_get(self): - id= self.store.put(1, 'one') - self.store.updateStoreInterfaced('one', id) - assert self.store._get('one') == 1 - - def test__getNonexistent(self): - - # Handle exception thrown - with pytest.raises(Exception) as cm: - # Check that the exception thrown is a ObjectNotFoundError - self.store._get('three') - assert cm.exception.name == 'ObjectNotFoundError' - assert cm.exception.message == 'Cannnot find object with ID/name "three"' - -class StoreInterface_saveConfig(StoreInterfaceDependentTestCase): - - def test_config(self): - fileName= 'data/config_dump' - id= self.store.put(1, 'one') - id2= self.store.put(2, 'two') - config_ids=[id, id2] - self.store.saveConfig(config_ids) - with open(fileName, 'rb') as output: - assert pickle.load(output) == [1, 2] - -# Test out CSC matrix format after updating to arrow 0.14.0 -class StoreInterface_sparseMatrix(StoreInterfaceDependentTestCase): - - def test_csc(self): - csc = csc_matrix((3, 4), dtype=np.int8) - self.store.put(csc, "csc") - assert np.allclose(self.store.get("csc").toarray(), csc.toarray()) == True""" +def test_redis_get_one(setup_store, server_port_num): + store = RedisStoreInterface(server_port_num=server_port_num) + key = store.put(3) + assert 3 == store.get(key) diff --git a/test/test_tui.py b/test/test_tui.py index 50ad6d54..d45bccfe 100644 --- a/test/test_tui.py +++ b/test/test_tui.py @@ -9,7 +9,7 @@ from test_nexus import ports -@pytest.fixture() +@pytest.fixture def logger(ports): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -19,7 +19,7 @@ def logger(ports): logger.removeHandler(zmq_log_handler) -@pytest.fixture() +@pytest.fixture async def sockets(ports): with zmq.Context() as context: ctrl_socket = context.socket(REP) @@ -29,7 +29,7 @@ async def sockets(ports): yield (ctrl_socket, out_socket) -@pytest.fixture() +@pytest.fixture async def app(ports): mock = tui.TUI(*ports) yield mock From 3318e17053a8592fd036faefd260df8d6f7db90e Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Thu, 18 Jul 2024 20:19:38 -0700 Subject: [PATCH 02/12] Remove LMDB and use hdd config settings --- .gitignore | 6 - env/improv-suite2p_env.yml | 1 - env/improv.yml | 1 - env/improv_env.yml | 2 +- improv/nexus.py | 25 +- improv/replayer.py | 160 ------------- improv/store.py | 305 +----------------------- improv/utils/bad_config.yaml | 40 ---- improv/utils/checks.py | 7 - improv/utils/good_config.yaml | 40 ---- improv/utils/reader.py | 104 -------- improv/utils/utils.py | 51 ---- pyproject.toml | 1 - test/configs/minimal_with_settings.yaml | 1 - test/test_nexus.py | 2 - 15 files changed, 8 insertions(+), 738 deletions(-) delete mode 100755 improv/replayer.py delete mode 100644 improv/utils/bad_config.yaml delete mode 100644 improv/utils/good_config.yaml delete mode 100644 improv/utils/reader.py delete mode 100644 improv/utils/utils.py diff --git a/.gitignore b/.gitignore index e01aaec4..b76c5666 100644 --- a/.gitignore +++ b/.gitignore @@ -124,12 +124,6 @@ dmypy.json # Pyre type checker .pyre/ -# LMDB -*.mdb -*.xml -*.hdf5 -*.iml - *.tif arrow diff --git a/env/improv-suite2p_env.yml b/env/improv-suite2p_env.yml index 23895f77..bca78392 100644 --- a/env/improv-suite2p_env.yml +++ b/env/improv-suite2p_env.yml @@ -38,5 +38,4 @@ dependencies: - natsort - scanimage-tiff-reader - rastermap==0.1.0 - - lmdb - suite2p==0.6.16 diff --git a/env/improv.yml b/env/improv.yml index 5978d721..6891cd8e 100644 --- a/env/improv.yml +++ b/env/improv.yml @@ -8,5 +8,4 @@ dependencies: - pip - pyarrow - pyqt -- python-lmdb - pyyaml \ No newline at end of file diff --git a/env/improv_env.yml b/env/improv_env.yml index c08a93b7..389cb616 100644 --- a/env/improv_env.yml +++ b/env/improv_env.yml @@ -225,6 +225,6 @@ dependencies: - pip: - argh - line-profiler - - lmdb==0.97 + - redis - pathtools - watchdog diff --git a/improv/nexus.py b/improv/nexus.py index 30f382e8..cb226824 100644 --- a/improv/nexus.py +++ b/improv/nexus.py @@ -42,7 +42,6 @@ def __str__(self): def createNexus( self, file=None, - use_hdd=False, use_watcher=None, store_size=10_000_000, control_port=0, @@ -56,7 +55,6 @@ def createNexus( Args: file (string): Name of the config file. - use_hdd (bool): Whether to use hdd for the store. use_watcher (bool): Whether to use watcher for the store. store_size (int): initial store size control_port (int): port number for input socket @@ -81,8 +79,6 @@ def createNexus( # set config options loaded from file # in Python 3.9, can just merge dictionaries using precedence cfg = self.config.settings - if "use_hdd" not in cfg: - cfg["use_hdd"] = use_hdd if "use_watcher" not in cfg: cfg["use_watcher"] = use_watcher if "store_size" not in cfg: @@ -126,12 +122,6 @@ def createNexus( self.store.subscribe() - # LMDB storage - self.use_hdd = cfg["use_hdd"] - if self.use_hdd: - self.lmdb_name = f'lmdb_{datetime.now().strftime("%Y%m%d_%H%M%S")}' - self.store_dict = dict() - # TODO: Better logic/flow for using watcher as an option self.p_watch = None if cfg["use_watcher"]: @@ -636,19 +626,9 @@ def stop_polling(self, stop_signal, queues): logger.info("Polling has stopped.") def createStoreInterface(self, name): - """Creates StoreInterface w/ or w/out LMDB - functionality based on {self.use_hdd}.""" + """Creates StoreInterface""" if self.config.use_plasma(): - if not self.use_hdd: - return PlasmaStoreInterface(name, self.store_loc) - else: - # I don't think this currently works, - # since the constructor doesn't accept these arguments - if name not in self.store_dict: - self.store_dict[name] = PlasmaStoreInterface( - name, self.store_loc, use_hdd=True, lmdb_name=self.lmdb_name - ) - return self.store_dict[name] + return PlasmaStoreInterface(name, self.store_loc) else: return RedisStoreInterface(server_port_num=self.store_port) @@ -876,7 +856,6 @@ def startWatcher(self): from improv.watcher import Watcher self.watcher = Watcher("watcher", self.createStoreInterface("watcher")) - # store = self.createStoreInterface("watcher") if not self.use_hdd else None q_sig = Link("watcher_sig", self.name, "watcher") self.watcher.setLinks(q_sig) self.sig_queues.update({q_sig.name: q_sig}) diff --git a/improv/replayer.py b/improv/replayer.py deleted file mode 100755 index 31044f8e..00000000 --- a/improv/replayer.py +++ /dev/null @@ -1,160 +0,0 @@ -import time -from typing import Callable, List - -from pyarrow.plasma import ObjectID - -from nexus.actor import Actor, RunManager -from nexus.store import LMDBStoreInterface, LMDBData - - -class Replayer(Actor): - def __init__(self, *args, lmdb_path, replay: str, resave=False, **kwargs): - """ - Class that outputs objects to queues based on a saved previous run. - - Args: - lmdb_path: path to LMDB folder - replay: named of Actor to replay. - resave: (if using LMDB in this instance) - save outputs from this actor as usual (default: False) - - """ - super().__init__(*args, **kwargs) - self.resave = resave - - self.lmdb = LMDBStoreInterface(path=lmdb_path, load=True) - self.lmdb_values: list = self.get_lmdb_values(replay) - assert len(self.lmdb_values) > 0 - - self.gui_messages: dict = self.get_lmdb_values( - "GUI", func=lambda x: {lmdbdata.obj[0]: lmdbdata.time for lmdbdata in x} - ) - - self.t_saved_start_run = self.gui_messages["run"] # TODO Add load GUI actions - self.t_start_run = None - - def get_lmdb_values(self, replay: str, func: Callable = None) -> List[LMDBData]: - """ - Load saved queue objects from LMDB - - Args: - replay: named of Actor - func: (optional) Function to apply to objects before returning - - Returns: - lmdb_values - - """ - # Get all out queue names - replay = f"q__{replay}" - keys = [ - key.decode() - for key in self.lmdb.get_keys() - if key.startswith(replay.encode()) - ] - - # Get relevant keys, convert to str, and sort. Then convert back to bytes. - keys = [key.encode() for key in keys] - lmdb_values = sorted( - self.lmdb.get(keys, include_metadata=True), - key=lambda lmdb_value: lmdb_value.time, - ) - - if func is not None: - return func(lmdb_values) - return lmdb_values - - def setup(self): - if self.client.use_hdd and not self.resave: - self.client.use_hdd = False - - self.move_to_plasma(self.lmdb_values) - self.put_setup(self.lmdb_values) - - def move_to_plasma(self, lmdb_values): - """Put objects into current plasma store and update object ID in saved queue. - - Args: - lmdb_values: - """ - - # TODO Make async to enable queue-based fetch system - - # to avoid loading everything at once. - for lmdbdata in lmdb_values: - try: - if len(lmdbdata.obj) == 1 and isinstance( - lmdbdata.obj[0], dict - ): # Raw frames - data = lmdbdata.obj[0] - for i, obj_id in data.items(): - if isinstance(obj_id, ObjectID): - actual_obj = self.lmdb.get(obj_id, include_metadata=True) - lmdbdata.obj = [ - {i: self.client.put(actual_obj.obj, actual_obj.name)} - ] - - for i, obj in enumerate(lmdbdata.obj): # List - if isinstance(obj, ObjectID): - actual_obj = self.lmdb.get(obj, include_metadata=True) - lmdbdata.obj[i] = self.client.put( - actual_obj.obj, actual_obj.name - ) - - else: # Not object ID, do nothing. - pass - - except (TypeError, AttributeError): # Something else. - pass - - def put_setup(self, lmdb_values): - """Put all objects created before Run into queue immediately. - - Args: - lmdb_values: - """ - for lmdb_value in lmdb_values: - if lmdb_value.time < self.t_saved_start_run: - getattr(self, lmdb_value.queue).put(lmdb_value.obj) - - def run(self): - self.t_start_run = time.time() - with RunManager( - self.name, self.runner, self.setup, self.q_sig, self.q_comm - ) as rm: - print(rm) - - def runner(self): - """ - Get list of objects and output them - to their respective queues based on time delay. - """ - for lmdb_value in self.lmdb_values: - if lmdb_value.time >= self.t_saved_start_run: - t_sleep = ( - lmdb_value.time + self.t_start_run - self.t_saved_start_run - ) - time.time() - if t_sleep > 0: - time.sleep(t_sleep) - getattr(self, lmdb_value.queue).put(lmdb_value.obj) - - # policy = asyncio.get_event_loop_policy() - # policy.set_event_loop(policy.new_event_loop()) - # self.loop = asyncio.get_event_loop() - # - # self.aqueue = asyncio.Queue() - # self.loop.run_until_complete(self.arun()) - # - # async def arun(self): - # - # - # funcs_to_run = [self.send_q, self.fetch_lmdb] - # async with AsyncRunManager(self.name, funcs_to_run, self.setup, - # self.q_sig, self.q_comm) as rm: - # print(rm) - # - # async def send_q(self): - # - # for t in self.times: - # now = time.time() - # await asyncio.sleep(t - now) - # self.q_out.put(list(dict())) diff --git a/improv/store.py b/improv/store.py index 614fcf28..98f54a63 100644 --- a/improv/store.py +++ b/improv/store.py @@ -1,10 +1,7 @@ import os import uuid -import lmdb -import time import pickle -import signal import logging import traceback @@ -16,15 +13,9 @@ from redis.backoff import ConstantBackoff from redis.exceptions import BusyLoadingError, ConnectionError, TimeoutError -from queue import Queue -from pathlib import Path -from random import random -from threading import Thread -from typing import List, Union -from dataclasses import dataclass, make_dataclass from scipy.sparse import csc_matrix from pyarrow.lib import ArrowIOError -from pyarrow._plasma import PlasmaObjectExists, ObjectNotAvailable, ObjectID +from pyarrow._plasma import PlasmaObjectExists, ObjectNotAvailable REDIS_GLOBAL_TOPIC = "global_topic" @@ -207,49 +198,6 @@ def __init__(self, name="default", store_loc="/tmp/store"): self.store_loc = store_loc self.client = self.connect_store(store_loc) self.stored = {} - self.use_hdd = False - - def _setup_LMDB( - self, - use_lmdb=False, - lmdb_path="../outputs/", - lmdb_name=None, - hdd_maxstore=1e12, - flush_immediately=False, - commit_freq=1, - ): - """ - Constructor for the Store - - Args: - name (string): - store_loc (stirng): Apache Arrow Plasma client location - - use_lmdb (bool): Also write data to disk using the LMDB - - hdd_maxstore: - Maximum size database may grow to; used to size the memory mapping. - If the database grows larger than map_size, - a MapFullError will be raised. - On 64-bit there is no penalty for making this huge. - Must be <2GB on 32-bit. - - hdd_path: Path to LMDB folder. - flush_immediately (bool): Save objects to disk immediately - commit_freq (int): If not flush_immediately, - flush data to disk every {commit_freq} seconds. - """ - self.use_hdd = use_lmdb - self.flush_immediately = flush_immediately - - if use_lmdb: - self.lmdb_store = LMDBStoreInterface( - path=lmdb_path, - name=lmdb_name, - max_size=hdd_maxstore, - flush_immediately=flush_immediately, - commit_freq=commit_freq, - ) def connect_store(self, store_loc): """Connect to the store at store_loc, max 20 retries to connect @@ -296,8 +244,6 @@ class 'plasma.ObjectID': Plasma object ID else: object_id = self.client.put(object) - if self.use_hdd: - self.lmdb_store.put(object, object_name, obj_id=object_id) except PlasmaObjectExists: logger.error("Object already exists. Meant to call replace?") except ArrowIOError: @@ -328,13 +274,12 @@ def get(self, object_name): # else: return self.getID(object_name) - def getID(self, obj_id, hdd_only=False): + def getID(self, obj_id): """ Get object by object ID Args: obj_id (class 'plasma.ObjectID'): the id of the object - hdd_only (bool): Returns: Stored object @@ -342,17 +287,9 @@ def getID(self, obj_id, hdd_only=False): Raises: ObjectNotFoundError: If the id is not found """ - # Check in RAM - if not hdd_only: - res = self.client.get(obj_id, 0) # Timeout = 0 ms - if res is not plasma.ObjectNotAvailable: - return res if not isinstance(res, bytes) else pickle.loads(res) - - # Check in disk - if self.use_hdd: - res = self.lmdb_store.get(obj_id) - if res is not None: - return res + res = self.client.get(obj_id, 0) # Timeout = 0 ms + if res is not plasma.ObjectNotAvailable: + return res if not isinstance(res, bytes) else pickle.loads(res) logger.warning("Object {} cannot be found.".format(obj_id)) raise ObjectNotFoundError @@ -459,241 +396,9 @@ def _get(self, object_name): return res -class LMDBStoreInterface(StoreInterface): - def __init__( - self, - path="../outputs/", - name=None, - load=False, - max_size=1e12, - flush_immediately=False, - commit_freq=1, - ): - """ - Constructor for LMDB store - - Args: - path (string): Path to folder containing LMDB folder. - name (string): Name of LMDB. Required if not {load]. - max_size (float): - Maximum size database may grow to; used to size the memory mapping. - If the database grows larger than map_size, a MapFullError will be raised. - On 64-bit there is no penalty for making this huge. Must be <2GB on 32-bit. - load (bool): For Replayer use. Informs the class that we're loading - from a previous LMDB, not create a new one. - flush_immediately (bool): Save objects to disk immediately - commit_freq (int): If not flush_immediately, - flush data to disk every {commit_freq} seconds. - """ - - # Check if LMDB folder exists. - # LMDB path? - path = Path(path) - if load: - if name is not None: - path = path / name - if not (path / "data.mdb").exists(): - raise FileNotFoundError("Invalid LMDB directory.") - else: - assert name is not None - if not path.exists(): - path.mkdir(parents=True) - path = path / name - - self.flush_immediately = flush_immediately - self.lmdb_env = lmdb.open( - path.as_posix(), map_size=max_size, sync=flush_immediately - ) - self.lmdb_commit_freq = commit_freq - - self.put_queue = Queue() - self.put_queue_container = make_dataclass( - "LMDBPutContainer", [("name", str), ("obj", bytes)] - ) - # Initialize only after interpreter has forked at the start of each actor. - self.commit_thread: Thread = None - signal.signal(signal.SIGINT, self.flush) - - def get( - self, - key: Union[plasma.ObjectID, bytes, List[plasma.ObjectID], List[bytes]], - include_metadata=False, - ): - """ - Get object using key (could be any byte string or plasma.ObjectID) - - Args: - key: - include_metadata (bool): returns whole LMDBData if true else LMDBData.obj - (just the stored object). - Returns: - object or LMDBData - """ - while True: - try: - if isinstance(key, str) or isinstance(key, ObjectID): - return self._get_one( - LMDBStoreInterface._convert_obj_id_to_bytes(key), - include_metadata, - ) - return self._get_batch( - list(map(LMDBStoreInterface._convert_obj_id_to_bytes, key)), - include_metadata, - ) - except ( - lmdb.BadRslotError - ): # Happens when multiple transactions access LMDB at the same time. - pass - - def _get_one(self, key, include_metadata): - with self.lmdb_env.begin() as txn: - r = txn.get(key) - - if r is None: - return None - return pickle.loads(r) if include_metadata else pickle.loads(r).obj - - def _get_batch(self, keys, include_metadata): - with self.lmdb_env.begin() as txn: - objs = [txn.get(key) for key in keys] - - if include_metadata: - return [pickle.loads(obj) for obj in objs if obj is not None] - else: - return [pickle.loads(obj).obj for obj in objs if obj is not None] - - def get_keys(self): - """Get all keys in LMDB""" - with self.lmdb_env.begin() as txn: - with txn.cursor() as cur: - cur.first() - return [key for key in cur.iternext(values=False)] - - def put(self, obj, obj_name, obj_id=None, flush_this_immediately=False): - """ - Put object ID / object pair into LMDB. - - Args: - obj: Object to be saved - obj_name (str): the name of the object - obj_id ('plasma.ObjectID'): Object_id from Plasma client - flush_this_immediately (bool): Override self.flush_immediately. - For storage of critical objects. - - Returns: - None - """ - # TODO: Duplication check - if self.commit_thread is None: - self.commit_thread = Thread(target=self.commit_daemon, daemon=True) - self.commit_thread.start() - - if obj_name.startswith("q_") or obj_name.startswith("config"): # Queue - name = obj_name.encode() - is_queue = True - else: - name = obj_id.binary() if obj_id is not None else obj_name.encode() - is_queue = False - - self.put_queue.put( - self.put_queue_container( - name=name, - obj=pickle.dumps( - LMDBData(obj, time=time.time(), name=obj_name, is_queue=is_queue) - ), - ) - ) - - # Write - if self.flush_immediately or flush_this_immediately: - self.commit() - self.lmdb_env.sync() - - def flush(self, sig=None, frame=None): - """Must run before exiting. Flushes buffer to disk.""" - self.commit() - self.lmdb_env.sync() - self.lmdb_env.close() - exit(0) - - def commit_daemon(self): - time.sleep(2 * random()) # Reduce multiple commits at the same time. - while True: - time.sleep(self.lmdb_commit_freq) - self.commit() - - def commit(self): - """Commit objects in {self.put_cache} into LMDB.""" - if not self.put_queue.empty(): - print(self.put_queue.qsize()) - with self.lmdb_env.begin(write=True) as txn: - while not self.put_queue.empty(): - container = self.put_queue.get_nowait() - txn.put(container.name, container.obj, overwrite=True) - - def delete(self, obj_id): - """ - Delete object from LMDB. - - Args: - obj_id (class 'plasma.ObjectID'): the object_id to be deleted - - Returns: - None - - Raises: - ObjectNotFoundError: If the id is not found - """ - with self.lmdb_env.begin(write=True) as txn: - out = txn.pop(LMDBStoreInterface._convert_obj_id_to_bytes(obj_id)) - if out is None: - raise ObjectNotFoundError - - @staticmethod - def _convert_obj_id_to_bytes(obj_id): - try: - return obj_id.binary() - except AttributeError: - return obj_id - - def replace(self): - pass # TODO - - def subscribe(self): - pass # TODO - - -# Aliasing StoreInterface = RedisStoreInterface -@dataclass -class LMDBData: - """ - Dataclass to store objects and their metadata into LMDB. - """ - - obj: object - time: float - name: str = None - is_queue: bool = False - - @property - def queue(self): - """ - Returns: - Queue name if object is a queue else None - """ - # Expected: 'q__Acquirer.q_out__124' -> {'q_out'} - if self.is_queue: - try: - return self.name.split("__")[1].split(".")[1] - except IndexError: - return "q_comm" - logger.error("Attempt to get queue name from objects not from queue.") - return None - - class ObjectNotFoundError(Exception): def __init__(self, obj_id_or_name): super().__init__() diff --git a/improv/utils/bad_config.yaml b/improv/utils/bad_config.yaml deleted file mode 100644 index d30cfd56..00000000 --- a/improv/utils/bad_config.yaml +++ /dev/null @@ -1,40 +0,0 @@ -modules: - GUI: - package: visual.visual - class: DisplayVisual - visual: Visual - - Acquirer: - package: acquire.acquire - class: LMDBAcquirer - lmdb_path: /Users/chaichontat/Desktop/Lab/New/rasp/src/output/lmdb - # filename: /Users/chaichontat/Desktop/Lab/New/rasp/data/Tolias_mesoscope_2.hdf5 - framerate: 15 - - Processor: - package: process.process - class: CaimanProcessor - - Visual: - package: visual.visual - class: CaimanVisual - - Analysis: - package: analysis.analysis - class: MeanAnalysis - - InputStim: - package: acquire.acquire - class: BehaviorAcquirer - - Watcherr: - package: watch.watch - class: Watcher - - -connections: - Acquirer.q_out: [Processor.q_in, Visual.raw_frame_queue, Watcherr.raw_frame_queue] - Processor.q_out: [Analysis.q_in] - Analysis.q_out: [Visual.q_in, Acquirer.q_in] - InputStim.q_out: [Analysis.input_stim_queue] - diff --git a/improv/utils/checks.py b/improv/utils/checks.py index 230107cd..a72f4657 100644 --- a/improv/utils/checks.py +++ b/improv/utils/checks.py @@ -5,13 +5,6 @@ $ python checks.py [file_name].yaml - $ python checks.py good_config.yaml - No loops. - - $ python checks.py bad_config.yaml - Loop(s) found. - Processor to Analysis to Acquirer - """ import sys diff --git a/improv/utils/good_config.yaml b/improv/utils/good_config.yaml deleted file mode 100644 index d91764c8..00000000 --- a/improv/utils/good_config.yaml +++ /dev/null @@ -1,40 +0,0 @@ -modules: - GUI: - package: visual.visual - class: DisplayVisual - visual: Visual - - Acquirer: - package: acquire.acquire - class: LMDBAcquirer - lmdb_path: /Users/chaichontat/Desktop/Lab/New/rasp/src/output/lmdb - # filename: /Users/chaichontat/Desktop/Lab/New/rasp/data/Tolias_mesoscope_2.hdf5 - framerate: 15 - - Processor: - package: process.process - class: CaimanProcessor - - Visual: - package: visual.visual - class: CaimanVisual - - Analysis: - package: analysis.analysis - class: MeanAnalysis - - InputStim: - package: acquire.acquire - class: BehaviorAcquirer - - Watcherr: - package: watch.watch - class: Watcher - - -connections: - Acquirer.q_out: [Processor.q_in, Visual.raw_frame_queue, Watcherr.raw_frame_queue] - Processor.q_out: [Analysis.q_in] - Analysis.q_out: [Visual.q_in] - InputStim.q_out: [Analysis.input_stim_queue] - diff --git a/improv/utils/reader.py b/improv/utils/reader.py deleted file mode 100644 index 036d8f3d..00000000 --- a/improv/utils/reader.py +++ /dev/null @@ -1,104 +0,0 @@ -import os -import pickle -from contextlib import contextmanager - -# from typing import Dict, Set -import lmdb -from .utils import get_num_length_from_key - - -class LMDBReader: - def __init__(self, path): - """Constructor for the LMDB reader - path: Path to LMDB folder - """ - if not os.path.exists(path): - raise FileNotFoundError - self.path = path - - def get_all_data(self): - """Load all data from LMDB into a dictionary - Make sure that the LMDB is small enough to fit in RAM - """ - with LMDBReader._lmdb_cur(self.path) as cur: - return { - LMDBReader._decode_key(key): pickle.loads(value) - for key, value in cur.iternext() - } - - def get_data_types(self): - """Return all data types defined as {object_name}, but without number.""" - num_idx = get_num_length_from_key() - - with LMDBReader._lmdb_cur(self.path) as cur: - return { - key[: -12 - num_idx.send(key)] for key in cur.iternext(values=False) - } - - def get_data_by_number(self, t): - """Return data at a specific frame number t""" - num_idx = get_num_length_from_key() - - def check_if_key_equals_t(key): - try: - return True if int(key[-12 - num_idx.send(key) : -12]) == t else False - except ValueError: - return False - - with LMDBReader._lmdb_cur(self.path) as cur: - keys = ( - key for key in cur.iternext(values=False) if check_if_key_equals_t(key) - ) - return { - LMDBReader._decode_key(key): pickle.loads(cur.get(key)) for key in keys - } - - def get_data_by_type(self, t): - """Return data with key that starts with t""" - with LMDBReader._lmdb_cur(self.path) as cur: - keys = ( - key for key in cur.iternext(values=False) if key.startswith(t.encode()) - ) - return { - LMDBReader._decode_key(key): pickle.loads(cur.get(key)) for key in keys - } - - def get_params(self): - """Return parameters in a dictionary""" - with LMDBReader._lmdb_cur(self.path) as cur: - keys = [ - key - for key in cur.iternext(values=False) - if key.startswith(b"params_dict") - ] - return pickle.loads(cur.get(keys[-1])) - - @staticmethod - def _decode_key(key): - """Helper method to convert key from byte to str - - Example: - >>> LMDBReader._decode_key(b'Call0\x80\x03GA\xd7Ky\x06\x9c\xddi.') - 'Call0_1563288602.4510138' - - key: Encoded key. The last 12 bytes are pickled time.time(). - The remaining are encoded object name. - """ - - return f"{key[:-12].decode()}_{pickle.loads(key[-12:])}" - - @staticmethod - @contextmanager - def _lmdb_cur(path): - """Helper context manager to open and ensure proper closure of LMDB""" - - env = lmdb.open(path) - txn = env.begin() - cur = txn.cursor() - try: - yield cur - - finally: - cur.__exit__() - txn.commit() - env.close() diff --git a/improv/utils/utils.py b/improv/utils/utils.py deleted file mode 100644 index c7cc0072..00000000 --- a/improv/utils/utils.py +++ /dev/null @@ -1,51 +0,0 @@ -# from functools import wraps - -# def coroutine(func): #FIXME who uses this and why? -# """ Decorator that primes 'func' by calling first {yield}. """ - -# @wraps(func) -# def primer(*args, **kwargs): -# gen = func(*args, **kwargs) -# next(gen) -# return gen -# return primer - - -# @coroutine -def get_num_length_from_key(): - """ - Coroutine that gets the length of digits in LMDB key. - Assumes that object name does not have any digits. - - For example: - FileAcquirer puts objects with names 'acq_raw{i}' where i is the frame number. - {i}, however, is not padded with zero, so the length changes with number. - The B-tree sorting in LMDB results in messed up number sorting. - Example: - >>> num_idx = get_num_length_from_key() - >>> num_idx.send(b'acq_raw1\x80\x03GA\xd7L\x1b\x8f\xb0\x1b\xb0.') - 1 - - """ - max_num_len = 1 # Keep track of largest digit for performance. - - def worker(): - nonlocal max_num_len - name_num = key[:-12].decode() - - if not name_num[-max_num_len:].isdigit(): - i = max_num_len - while not name_num[-i:].isdigit(): - if i < 1: - return 0 - i -= 1 - return i - - while name_num[-(max_num_len + 1) :].isdigit(): - max_num_len += 1 - return max_num_len - - num = "Primed!" - while True: - key = yield num - num = worker() diff --git a/pyproject.toml b/pyproject.toml index bec4b3e4..bf5a42b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,6 @@ dependencies = [ "matplotlib", "pyarrow==9.0.0", "PyQt5", - "lmdb", "pyyaml", "textual==0.15.0", "pyzmq", diff --git a/test/configs/minimal_with_settings.yaml b/test/configs/minimal_with_settings.yaml index 37d43c55..a5aec2d8 100644 --- a/test/configs/minimal_with_settings.yaml +++ b/test/configs/minimal_with_settings.yaml @@ -4,7 +4,6 @@ settings: control_port: 6000 output_port: 6001 logging_port: 6002 - use_hdd: false use_watcher: false actors: diff --git a/test/test_nexus.py b/test/test_nexus.py index fc8dd458..a21326e9 100644 --- a/test/test_nexus.py +++ b/test/test_nexus.py @@ -134,7 +134,6 @@ def test_argument_config_precedence(setdir, ports): control_port=ports[0], output_port=ports[1], store_size=11_000_000, - use_hdd=True, use_watcher=True, ) cfg = nex.config.settings @@ -142,7 +141,6 @@ def test_argument_config_precedence(setdir, ports): assert cfg["control_port"] == ports[0] assert cfg["output_port"] == ports[1] assert cfg["store_size"] == 20_000_000 - assert not cfg["use_hdd"] assert not cfg["use_watcher"] From ac282dd513a617cb715a9131d4636875db418939 Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Thu, 25 Jul 2024 20:38:19 -0700 Subject: [PATCH 03/12] swap RDB for AOF --- improv/config.py | 29 +++-- improv/nexus.py | 111 ++++++++++++------ ...l => minimal_with_custom_aof_dirname.yaml} | 2 +- ...> minimal_with_ephemeral_aof_dirname.yaml} | 2 +- .../minimal_with_every_second_saving.yaml | 15 +++ .../minimal_with_every_write_saving.yaml | 15 +++ .../minimal_with_no_schedule_saving.yaml | 15 +++ test/conftest.py | 2 - test/test_nexus.py | 102 ++++++++++++---- 9 files changed, 224 insertions(+), 69 deletions(-) rename test/configs/{minimal_with_custom_dbfilename.yaml => minimal_with_custom_aof_dirname.yaml} (85%) rename test/configs/{minimal_with_ephemeral_dbfilename.yaml => minimal_with_ephemeral_aof_dirname.yaml} (85%) create mode 100644 test/configs/minimal_with_every_second_saving.yaml create mode 100644 test/configs/minimal_with_every_write_saving.yaml create mode 100644 test/configs/minimal_with_no_schedule_saving.yaml diff --git a/improv/config.py b/improv/config.py index 13c7517b..5f73c18b 100644 --- a/improv/config.py +++ b/improv/config.py @@ -146,28 +146,39 @@ def redis_port_specified(self): def redis_saving_enabled(self): if "redis_config" in self.config.keys(): return ( - "enable_saving" in self.config["redis_config"] - and self.config["redis_config"]["enable_saving"] + self.config["redis_config"]["enable_saving"] + if "enable_saving" in self.config["redis_config"] + else None ) - return None - def generate_ephemeral_db_filename(self): + def generate_ephemeral_aof_dirname(self): if "redis_config" in self.config.keys(): return ( - "generate_ephemeral_db_filename" in self.config["redis_config"] - and self.config["redis_config"]["generate_ephemeral_db_filename"] + self.config["redis_config"]["generate_ephemeral_aof_dirname"] + if "generate_ephemeral_aof_dirname" in self.config["redis_config"] + else None ) return False - def get_redis_db_filename(self): + def get_redis_aof_dirname(self): if "redis_config" in self.config.keys(): return ( - self.config["redis_config"]["db_filename"] - if "db_filename" in self.config["redis_config"] + self.config["redis_config"]["aof_dirname"] + if "aof_dirname" in self.config["redis_config"] else None ) return None + def get_redis_fsync_frequency(self): + if "redis_config" in self.config.keys(): + frequency = ( + self.config["redis_config"]["fsync_frequency"] + if "fsync_frequency" in self.config["redis_config"] + else None + ) + + return frequency + @staticmethod def get_default_redis_port(): return "6379" diff --git a/improv/nexus.py b/improv/nexus.py index cb226824..20cb870d 100644 --- a/improv/nexus.py +++ b/improv/nexus.py @@ -30,10 +30,11 @@ class Nexus: """Main server class for handling objects in improv""" def __init__(self, name="Server"): + self.redis_fsync_frequency = None self.store = None self.config = None self.name = name - self.redis_dumpfile = None + self.aof_dir = None self.redis_saving_enabled = False def __str__(self): @@ -238,36 +239,69 @@ def initConfig(self): def configure_redis_persistence(self): # invalid configs: specifying filename and using an ephemeral filename, # specifying that saving is off but providing either filename option - db_filename = self.config.get_redis_db_filename() - generate_unique_filename = self.config.generate_ephemeral_db_filename() + aof_dirname = self.config.get_redis_aof_dirname() + generate_unique_dirname = self.config.generate_ephemeral_aof_dirname() redis_saving_enabled = self.config.redis_saving_enabled() + redis_fsync_frequency = self.config.get_redis_fsync_frequency() - if db_filename and generate_unique_filename: + if aof_dirname and generate_unique_dirname: logger.error( - "Cannot both generate a unique filename and use the one provided." + "Cannot both generate a unique dirname and use the one provided." ) - raise Exception("Cannot use unique filename and use the one provided.") - - if db_filename or generate_unique_filename: - redis_saving_enabled = True + raise Exception("Cannot use unique dirname and use the one provided.") + + if aof_dirname or generate_unique_dirname or redis_fsync_frequency: + if redis_saving_enabled is None: + redis_saving_enabled = True + elif not redis_saving_enabled: + logger.error( + "Invalid configuration. Cannot save to disk with saving disabled." + ) + raise Exception("Cannot persist to disk with saving disabled.") self.redis_saving_enabled = redis_saving_enabled - if not redis_saving_enabled and (db_filename or generate_unique_filename): - logger.error( - "Invalid configuration. Cannot save to a file with saving disabled." + if redis_fsync_frequency and redis_fsync_frequency not in [ + "every_write", + "every_second", + "no_schedule", + ]: + logger.error("Cannot use unknown fsync frequency ", redis_fsync_frequency) + raise Exception( + "Cannot use unknown fsync frequency ", redis_fsync_frequency ) - raise Exception("Cannot save to a file with saving disabled.") - if db_filename: - self.redis_dumpfile = db_filename - elif generate_unique_filename: - self.redis_dumpfile = str(uuid.uuid1()) + ".rdb" + if redis_fsync_frequency is None: + redis_fsync_frequency = "no_schedule" - if self.redis_saving_enabled and self.redis_dumpfile is not None: - logger.info("Redis saving enabled. Saving to file " + self.redis_dumpfile) + if redis_fsync_frequency == "every_write": + self.redis_fsync_frequency = "always" + elif redis_fsync_frequency == "every_second": + self.redis_fsync_frequency = "everysec" + elif redis_fsync_frequency == "no_schedule": + self.redis_fsync_frequency = "no" + else: + logger.error("Unknown fsync frequency ", redis_fsync_frequency) + raise Exception("Unknown fsync frequency ", redis_fsync_frequency) + + if aof_dirname: + self.aof_dir = aof_dirname + elif generate_unique_dirname: + self.aof_dir = "improv_persistence_" + str(uuid.uuid1()) + + if self.redis_saving_enabled and self.aof_dir is not None: + logger.info( + "Redis saving enabled. Saving to directory " + + self.aof_dir + + " on schedule " + + "'{}'".format(self.redis_fsync_frequency) + ) elif self.redis_saving_enabled: - logger.info("Redis saving enabled with default dumpfile.") + logger.info( + "Redis saving enabled with default directory " + + "on schedule " + + "'{}'".format(self.redis_fsync_frequency) + ) else: logger.info("Redis saving disabled.") @@ -715,24 +749,33 @@ def start_redis(self, size): str(self.store_port), "--maxmemory", str(size), + "--save", # this only turns off RDB, which we want permanently off + '""', ] - if self.redis_dumpfile is not None and len(self.redis_dumpfile) == 0: - raise Exception("Save file specified but no filename given.") + if self.aof_dir is not None and len(self.aof_dir) == 0: + raise Exception("Persistence directory specified but no filename given.") - if ( - not self.redis_saving_enabled - ): # the default behavior - do not persist db state. - subprocess_command += ["--save", '""'] - logger.info("Redis dump file disabled.") - elif ( - self.redis_dumpfile is not None - ): # use specified (possibly pre-existing) file - # subprocess_command += ["--save", "1 1"] - subprocess_command += ["--dbfilename", self.redis_dumpfile] - logger.info("Redis dump file set to {}".format(self.redis_dumpfile)) - else: # just use the (possibly preexisting) default dump.rdb file + if self.aof_dir is not None: # use specified (possibly pre-existing) file # subprocess_command += ["--save", "1 1"] + subprocess_command += [ + "--appendonly", + "yes", + "--appendfsync", + self.redis_fsync_frequency, + "--appenddirname", + self.aof_dir, + ] + logger.info("Redis persistence directory set to {}".format(self.aof_dir)) + elif ( + self.redis_saving_enabled + ): # just use the (possibly preexisting) default aof dir + subprocess_command += [ + "--appendonly", + "yes", + "--appendfsync", + self.redis_fsync_frequency, + ] logger.info("Proceeding with using default Redis dump file.") logger.info( diff --git a/test/configs/minimal_with_custom_dbfilename.yaml b/test/configs/minimal_with_custom_aof_dirname.yaml similarity index 85% rename from test/configs/minimal_with_custom_dbfilename.yaml rename to test/configs/minimal_with_custom_aof_dirname.yaml index eff3b34c..1bac5863 100644 --- a/test/configs/minimal_with_custom_dbfilename.yaml +++ b/test/configs/minimal_with_custom_aof_dirname.yaml @@ -11,4 +11,4 @@ connections: Generator.q_out: [Processor.q_in] redis_config: - db_filename: custom_dbfilename.rdb \ No newline at end of file + aof_dirname: custom_aof_dirname \ No newline at end of file diff --git a/test/configs/minimal_with_ephemeral_dbfilename.yaml b/test/configs/minimal_with_ephemeral_aof_dirname.yaml similarity index 85% rename from test/configs/minimal_with_ephemeral_dbfilename.yaml rename to test/configs/minimal_with_ephemeral_aof_dirname.yaml index 9f97372d..9933896e 100644 --- a/test/configs/minimal_with_ephemeral_dbfilename.yaml +++ b/test/configs/minimal_with_ephemeral_aof_dirname.yaml @@ -11,4 +11,4 @@ connections: Generator.q_out: [Processor.q_in] redis_config: - generate_ephemeral_db_filename: True \ No newline at end of file + generate_ephemeral_aof_dirname: True \ No newline at end of file diff --git a/test/configs/minimal_with_every_second_saving.yaml b/test/configs/minimal_with_every_second_saving.yaml new file mode 100644 index 00000000..74d4b314 --- /dev/null +++ b/test/configs/minimal_with_every_second_saving.yaml @@ -0,0 +1,15 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + +redis_config: + enable_saving: True + fsync_frequency: every_second \ No newline at end of file diff --git a/test/configs/minimal_with_every_write_saving.yaml b/test/configs/minimal_with_every_write_saving.yaml new file mode 100644 index 00000000..add0c74f --- /dev/null +++ b/test/configs/minimal_with_every_write_saving.yaml @@ -0,0 +1,15 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + +redis_config: + enable_saving: True + fsync_frequency: every_write \ No newline at end of file diff --git a/test/configs/minimal_with_no_schedule_saving.yaml b/test/configs/minimal_with_no_schedule_saving.yaml new file mode 100644 index 00000000..2e5f62f3 --- /dev/null +++ b/test/configs/minimal_with_no_schedule_saving.yaml @@ -0,0 +1,15 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + +redis_config: + enable_saving: True + fsync_frequency: no_schedule \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index 23880328..f9b80fa5 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -28,8 +28,6 @@ def setup_store(server_port_num): "redis-server", "--save", '""', - "--dbfilename", - "cinonexistent.rdb", "--port", str(server_port_num), "--maxmemory", diff --git a/test/test_nexus.py b/test/test_nexus.py index a21326e9..a642f656 100644 --- a/test/test_nexus.py +++ b/test/test_nexus.py @@ -1,4 +1,5 @@ import glob +import shutil import time import os import pytest @@ -10,7 +11,6 @@ from improv.nexus import Nexus from improv.store import StoreInterface - # from improv.actor import Actor # from improv.store import StoreInterface @@ -425,7 +425,7 @@ def test_unspecified_port_default_busy(caplog, setdir, ports, setup_store): ) -def test_no_dumpfile_by_default(caplog, setdir, ports): +def test_no_aof_dir_by_default(caplog, setdir, ports): nex = Nexus("test") nex.createNexus( file="minimal.yaml", @@ -436,10 +436,11 @@ def test_no_dumpfile_by_default(caplog, setdir, ports): nex.destroyNexus() - assert "dump.rdb" not in os.listdir(".") + assert "appendonlydir" not in os.listdir(".") + assert all(["improv_persistence_" not in name for name in os.listdir(".")]) -def test_default_dumpfile_if_none_specified(caplog, setdir, ports, server_port_num): +def test_default_aof_dir_if_none_specified(caplog, setdir, ports, server_port_num): nex = Nexus("test") nex.createNexus( file="minimal_with_redis_saving.yaml", @@ -455,23 +456,20 @@ def test_default_dumpfile_if_none_specified(caplog, setdir, ports, server_port_n nex.destroyNexus() - logging.info(os.getcwd() + "\n") - logging.info(os.listdir(".")) - - assert "dump.rdb" in os.listdir(".") + assert "appendonlydir" in os.listdir(".") - if "dump.rdb" in os.listdir("."): - os.remove("dump.rdb") + if "appendonlydir" in os.listdir("."): + shutil.rmtree("appendonlydir") else: logging.info("didn't find dbfilename") logging.info("exited test") -def test_specify_static_dumpfile(caplog, setdir, ports, server_port_num): +def test_specify_static_aof_dir(caplog, setdir, ports, server_port_num): nex = Nexus("test") nex.createNexus( - file="minimal_with_custom_dbfilename.yaml", + file="minimal_with_custom_aof_dirname.yaml", store_size=10000000, control_port=ports[0], output_port=ports[1], @@ -484,20 +482,20 @@ def test_specify_static_dumpfile(caplog, setdir, ports, server_port_num): nex.destroyNexus() - assert "custom_dbfilename.rdb" in os.listdir(".") + assert "custom_aof_dirname" in os.listdir(".") - if "custom_dbfilename.rdb" in os.listdir("."): - os.remove("custom_dbfilename.rdb") + if "custom_aof_dirname" in os.listdir("."): + shutil.rmtree("custom_aof_dirname") else: logging.info("didn't find dbfilename") logging.info("exited test") -def test_use_ephemeral_dbfile(caplog, setdir, ports, server_port_num): +def test_use_ephemeral_aof_dir(caplog, setdir, ports, server_port_num): nex = Nexus("test") nex.createNexus( - file="minimal_with_ephemeral_dbfilename.yaml", + file="minimal_with_ephemeral_aof_dirname.yaml", store_size=10000000, control_port=ports[0], output_port=ports[1], @@ -510,16 +508,76 @@ def test_use_ephemeral_dbfile(caplog, setdir, ports, server_port_num): nex.destroyNexus() - logging.info(os.getcwd() + "\n") - logging.info(os.listdir(".")) - - assert any([".rdb" in filename for filename in os.listdir(".")]) + assert any(["improv_persistence_" in name for name in os.listdir(".")]) - [os.remove(db_filename) for db_filename in glob.glob("*.rdb")] + [shutil.rmtree(db_filename) for db_filename in glob.glob("improv_persistence_*")] logging.info("completed ephemeral db test") +def test_save_no_schedule(caplog, setdir, ports, server_port_num): + nex = Nexus("test") + nex.createNexus( + file="minimal_with_no_schedule_saving.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + store = StoreInterface(server_port_num=server_port_num) + + fsync_schedule = store.client.config_get("appendfsync") + + nex.destroyNexus() + + assert "appendonlydir" in os.listdir(".") + shutil.rmtree("appendonlydir") + + assert fsync_schedule["appendfsync"] == "no" + + +def test_save_every_second(caplog, setdir, ports, server_port_num): + nex = Nexus("test") + nex.createNexus( + file="minimal_with_every_second_saving.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + store = StoreInterface(server_port_num=server_port_num) + + fsync_schedule = store.client.config_get("appendfsync") + + nex.destroyNexus() + + assert "appendonlydir" in os.listdir(".") + shutil.rmtree("appendonlydir") + + assert fsync_schedule["appendfsync"] == "everysec" + + +def test_save_every_write(caplog, setdir, ports, server_port_num): + nex = Nexus("test") + nex.createNexus( + file="minimal_with_every_write_saving.yaml", + store_size=10000000, + control_port=ports[0], + output_port=ports[1], + ) + + store = StoreInterface(server_port_num=server_port_num) + + fsync_schedule = store.client.config_get("appendfsync") + + nex.destroyNexus() + + assert "appendonlydir" in os.listdir(".") + shutil.rmtree("appendonlydir") + + assert fsync_schedule["appendfsync"] == "always" + + @pytest.mark.skip(reason="Nexus no longer deletes files on shutdown. Nothing to test.") def test_store_already_deleted_issues_warning(caplog): nex = Nexus("test") From b304417ea5c93771af29ac5df0d244a83e064466 Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Tue, 30 Jul 2024 00:13:11 -0700 Subject: [PATCH 04/12] testing --- .github/workflows/CI.yaml | 46 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 0c80283d..258c03c0 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -21,12 +21,46 @@ jobs: fail-fast: false matrix: python-version: ["3.8", "3.9", "3.10"] - os: [ubuntu-latest, macos-latest] # [ubuntu-latest, macos-latest, windows-latest] + os: [windows-latest] # [ubuntu-latest, macos-latest, windows-latest] steps: - uses: actions/checkout@v4 with: fetch-depth: 0 + - name: Setup WSL (Windows) + if: startsWith(matrix.os, 'windows') + uses: Vampire/setup-wsl@v2.0.1 + with: + distribution: Ubuntu-22.04 + wsl-shell-command: bash + - name: Setup Python on WSL + if: startsWith(matrix.os, 'windows') + shell: wsl-bash {0} + run: | + pwd + sudo apt-get update -y + sudo apt-get install -y python3-pip + sudo apt-get install -y python3.10-venv + - name: Update WSL (Windows) + if: startsWith(matrix.os, 'windows') + shell: wsl-bash {0} + run: | + sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y lsb-release curl gpg + - name: Fetch Redis gpg (Windows) + if: startsWith(matrix.os, 'windows') + shell: wsl-bash {0} + run: | + curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg + - name: Configure Redis gpg (Windows) + if: startsWith(matrix.os, 'windows') + shell: wsl-bash {0} + run: | + echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list + - name: Install Redis (Windows) + if: startsWith(matrix.os, 'windows') + shell: wsl-bash {0} + run: | + sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y redis && sleep 5 && sudo systemctl stop redis-server - name: Update Ubuntu and Install Dependencies if: startsWith(matrix.os, 'ubuntu') run: | @@ -62,7 +96,15 @@ jobs: if: startsWith(matrix.os, 'macos') run: | pip install -e .[tests,lint] - + - name: Install package (Windows) + if: startsWith(matrix.os, 'windows') + shell: wsl-bash {0} + run: | + pip install --upgrade pip setuptools + pip install build + pip install flake8 pytest + python3 -m build + pip install -e .[tests,lint] --no-binary pyzmq - name: Test with pytest run: | python -m pytest --cov=improv From 5fc04cf48098eaa980e5f6c57593ee4dbc513305 Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Tue, 30 Jul 2024 00:24:08 -0700 Subject: [PATCH 05/12] systemctl --- .github/workflows/CI.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 258c03c0..94660255 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -60,7 +60,7 @@ jobs: if: startsWith(matrix.os, 'windows') shell: wsl-bash {0} run: | - sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y redis && sleep 5 && sudo systemctl stop redis-server + sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y redis && sleep 5 - name: Update Ubuntu and Install Dependencies if: startsWith(matrix.os, 'ubuntu') run: | From baf3339af70b83cca6594e08a0adaba9eafd980b Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Tue, 30 Jul 2024 00:26:15 -0700 Subject: [PATCH 06/12] wsl version --- .github/workflows/CI.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 94660255..989b0384 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -29,7 +29,7 @@ jobs: fetch-depth: 0 - name: Setup WSL (Windows) if: startsWith(matrix.os, 'windows') - uses: Vampire/setup-wsl@v2.0.1 + uses: Vampire/setup-wsl@v3 with: distribution: Ubuntu-22.04 wsl-shell-command: bash From e34a5b109cca58f5d261f9e8617a3fd30d692442 Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Tue, 30 Jul 2024 00:32:31 -0700 Subject: [PATCH 07/12] testing --- .github/workflows/CI.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 989b0384..119c7bec 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -60,7 +60,7 @@ jobs: if: startsWith(matrix.os, 'windows') shell: wsl-bash {0} run: | - sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y redis && sleep 5 + sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y redis && sleep 5 && sudo service stop redis-server - name: Update Ubuntu and Install Dependencies if: startsWith(matrix.os, 'ubuntu') run: | From ce60ab4c89e8d3d72850d418358619deef69c749 Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Tue, 30 Jul 2024 00:54:02 -0700 Subject: [PATCH 08/12] os specific test --- .github/workflows/CI.yaml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 119c7bec..39193118 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -105,7 +105,17 @@ jobs: pip install flake8 pytest python3 -m build pip install -e .[tests,lint] --no-binary pyzmq - - name: Test with pytest + - name: Test with pytest (Windows) + if: startsWith(matrix.os, 'windows') + shell: wsl-bash {0} + run: | + python -m pytest --cov=improv + - name: Test with pytest (Ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: | + python -m pytest --cov=improv + - name: Test with pytest (macOS) + if: startsWith(matrix.os, 'macos') run: | python -m pytest --cov=improv From 6cbd157f336fd6b979049fa6b5a5b38600f34faa Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Tue, 30 Jul 2024 01:05:26 -0700 Subject: [PATCH 09/12] syntax --- .github/workflows/CI.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 39193118..eb2d4a60 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -60,7 +60,7 @@ jobs: if: startsWith(matrix.os, 'windows') shell: wsl-bash {0} run: | - sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y redis && sleep 5 && sudo service stop redis-server + sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get install -y redis && sleep 5 && sudo service redis-server stop - name: Update Ubuntu and Install Dependencies if: startsWith(matrix.os, 'ubuntu') run: | From 60ad2572960ecbfb704592f8e84c1d8f15ad05d9 Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Tue, 30 Jul 2024 01:18:56 -0700 Subject: [PATCH 10/12] command --- .github/workflows/CI.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index eb2d4a60..e44e75b0 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -109,7 +109,7 @@ jobs: if: startsWith(matrix.os, 'windows') shell: wsl-bash {0} run: | - python -m pytest --cov=improv + python3 -m pytest --cov=improv - name: Test with pytest (Ubuntu) if: startsWith(matrix.os, 'ubuntu') run: | From 72b9455b0f35cd7e27bed95c96e031138533452d Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Tue, 30 Jul 2024 01:21:56 -0700 Subject: [PATCH 11/12] additional windows commands --- .github/workflows/CI.yaml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index e44e75b0..16139181 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -85,7 +85,17 @@ jobs: uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - - name: Install dependencies + - name: Install dependencies (Windows) + if: startsWith(matrix.os, 'windows') + shell: wsl-bash {0} + run: | + python -m pip install --upgrade pip build + - name: Install dependencies (Ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: | + python -m pip install --upgrade pip build + - name: Install dependencies (macOS) + if: startsWith(matrix.os, 'macos') run: | python -m pip install --upgrade pip build - name: Install package (Ubuntu) From 9101226052da10ec4e4a089b12a39e8188daf3d0 Mon Sep 17 00:00:00 2001 From: Richard Schonberg Date: Wed, 31 Jul 2024 00:14:03 -0700 Subject: [PATCH 12/12] test --- .github/workflows/CI.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 16139181..2cbc4e58 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -89,7 +89,7 @@ jobs: if: startsWith(matrix.os, 'windows') shell: wsl-bash {0} run: | - python -m pip install --upgrade pip build + python3 -m pip install --upgrade pip build - name: Install dependencies (Ubuntu) if: startsWith(matrix.os, 'ubuntu') run: | @@ -110,11 +110,11 @@ jobs: if: startsWith(matrix.os, 'windows') shell: wsl-bash {0} run: | - pip install --upgrade pip setuptools - pip install build - pip install flake8 pytest + python3 -m pip install --upgrade pip setuptools + python3 -m pip install build + python3 -m pip install flake8 pytest python3 -m build - pip install -e .[tests,lint] --no-binary pyzmq + python3 -m pip install -e .[tests,lint] --no-binary pyzmq - name: Test with pytest (Windows) if: startsWith(matrix.os, 'windows') shell: wsl-bash {0}