diff --git a/.github/workflows/pythontest.yaml b/.github/workflows/pythontest.yaml index 1771a82b7..1d16f1f58 100644 --- a/.github/workflows/pythontest.yaml +++ b/.github/workflows/pythontest.yaml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ "3.8" ] + python-version: [ "3.10" ] steps: - name: Checkout code uses: actions/checkout@v2 @@ -25,7 +25,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ "3.8", "3.9", "3.10" ] + python-version: [ "3.10" ] steps: - name: Checkout code uses: actions/checkout@v2 @@ -34,7 +34,7 @@ jobs: with: python-version: ${{ matrix.python-version }} - name: Install - run: pip install ".[google,focuser,sensors,testing]" + run: pip install ".[google,focuser,sensors,tasks,testing]" - name: Test run: pytest - name: Upload coverage report to codecov.io diff --git a/conf_files/pocs.yaml b/conf_files/pocs.yaml index 8a238fdfe..b92751cf0 100644 --- a/conf_files/pocs.yaml +++ b/conf_files/pocs.yaml @@ -32,6 +32,16 @@ directories: mounts: resources/mounts fields: conf_files/fields +# Celery task manager for distributed tasks. +celery: + broker_url: redis://localhost:6379 + result_backend: redis://localhost:6379 + service: + - name: pocs-celery + image: redis + ports: + 6379: 6379 + db: name: panoptes type: file diff --git a/setup.cfg b/setup.cfg index 5ecf67e62..4d69ee6c7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,7 @@ classifiers = License :: OSI Approved :: MIT License Operating System :: POSIX Programming Language :: Python :: 3 - Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.10 Programming Language :: Python :: 3 :: Only Topic :: Scientific/Engineering :: Astronomy Topic :: Scientific/Engineering :: Physics @@ -52,7 +52,7 @@ install_requires = # The usage of test_requires is discouraged, see `Dependency Management` docs # tests_require = pytest; pytest-cov # Require a specific Python version, e.g. Python 2.7 or >= 3.4 -python_requires = >=3.8 +python_requires = >="3.10" packages = find_namespace: [options.packages.find] @@ -72,6 +72,9 @@ google = gsutil protobuf rsa +tasks = + celery[redis] + docker testing = coverage pycodestyle diff --git a/src/panoptes/pocs/camera/gphoto/celery/__init__.py b/src/panoptes/pocs/camera/gphoto/celery/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/panoptes/pocs/camera/gphoto/celery/controller.py b/src/panoptes/pocs/camera/gphoto/celery/controller.py new file mode 100644 index 000000000..c540f1850 --- /dev/null +++ b/src/panoptes/pocs/camera/gphoto/celery/controller.py @@ -0,0 +1,57 @@ +from typing import List, Union + +from panoptes.pocs.camera.gphoto.canon import Camera as CanonCamera +from panoptes.pocs.utils import error +from panoptes.pocs.utils.tasks import TaskManager, RunTaskMixin + + +class Camera(CanonCamera, RunTaskMixin): + """A remote gphoto2 camera class that can call local or remote celery tasks.""" + + def __init__(self, queue: str | None = None, *args, **kwargs): + """Control a remote gphoto2 camera via a celery task. """ + super().__init__(connect=False, *args, **kwargs) + + self.celery_app = TaskManager.create_celery_app_from_config() + + self.task = None + self.queue = queue or self.name + self.connect() + self.logger.debug(f'Canon DSLR GPhoto2 camera celery task manager with queue={self.queue}') + + @property + def is_exposing(self): + return self.task and self.task.state == 'EXPOSING' + + def command(self, cmd, queue=None, **kwargs): + """Run a remote celery task attached to a camera. """ + + if self.is_exposing: + raise error.CameraBusy() + + queue = queue or self.queue + + arguments = ' '.join(cmd) + + self.logger.debug(f'Running remote gphoto2 task with {arguments=} to {queue=}') + self.task = self.call_task('camera.command', args=[arguments], queue=queue) + + def get_command_result(self, timeout: float = 10) -> Union[List[str], None]: + """Get the output from the remote camera task, blocking up to timeout.""" + cmd_result = self.task.get(timeout=timeout) + + self.logger.debug(f'Full results from command {cmd_result!r}') + + # Clear task. + self.task = None + + # Return just the actual output. TODO error checking? + return cmd_result['output'] + + def _create_fits_header(self, seconds, dark=None, metadata=None) -> dict: + fits_header = super(Camera, self)._create_fits_header(seconds, dark=dark, metadata=metadata) + return {k.lower(): v for k, v in dict(fits_header).items()} + + def _start_exposure(self, seconds=None, *args, **kwargs): + # TODO more here + self.task = self.call_task('camera.release_shutter', args=[seconds], queue=self.queue) diff --git a/src/panoptes/pocs/camera/gphoto/celery/settings.py b/src/panoptes/pocs/camera/gphoto/celery/settings.py new file mode 100644 index 000000000..45a31949a --- /dev/null +++ b/src/panoptes/pocs/camera/gphoto/celery/settings.py @@ -0,0 +1,44 @@ +import typing +from enum import IntEnum +from typing import Dict, Optional + +import pigpio +from pydantic import BaseSettings, BaseModel, Field + +from worker import gpio + + +class State(IntEnum): + LOW = 0 + HIGH = 1 + + +class Settings(BaseSettings): + camera_name: str + camera_port: str + camera_pin: int + broker_url: str = 'amqp://guest:guest@localhost:5672//' + result_backend: str = 'rpc://' + + class Config: + env_prefix = 'pocs_' + + +class Camera(BaseModel): + """A camera with a shutter release connected to a gpio pin.""" + name: str + port: str + pin: int + is_tethered: bool = False + + def setup_pin(self): + """Sets the mode for the GPIO pin.""" + # Get GPIO pin and set OUTPUT mode. + print(f'Setting {self.pin=} as OUTPUT for {self.name}') + gpio.set_mode(self.pin, pigpio.OUTPUT) + + +class AppSettings(BaseModel): + celery: Dict = Field(default_factory=dict) + camera: Camera + process: Optional[typing.Any] = None diff --git a/src/panoptes/pocs/camera/gphoto/celery/worker.py b/src/panoptes/pocs/camera/gphoto/celery/worker.py new file mode 100644 index 000000000..fd8bdcb48 --- /dev/null +++ b/src/panoptes/pocs/camera/gphoto/celery/worker.py @@ -0,0 +1,158 @@ +import re +import shutil +import subprocess +from contextlib import suppress +from typing import Optional, List, Union + +import pigpio +from celery import Celery +from panoptes.utils.time import current_time, CountdownTimer + +from panoptes.pocs.camera.gphoto.celery.settings import State, Settings, Camera, AppSettings + +# Create settings from env vars. +settings = Settings() + +# Build app settings. +app_settings = AppSettings( + camera=Camera(name=settings.camera_name, + port=settings.camera_port, + pin=settings.camera_pin), + celery=dict(broker_url=settings.broker_url, + result_backend=settings.result_backend), +) + +# Start celery. +app = Celery() +app.config_from_object(app_settings.celery) + +# Setup GPIO pins. +gpio = pigpio.pi() +app_settings.camera.setup_pin() + +camera_match_re = re.compile(r'([\w\d\s_.]{30})\s(usb:\d{3},\d{3})') +file_save_re = re.compile(r'Saving file as (.*)') + + +@app.task(name='camera.release_shutter', bind=True) +def release_shutter(self, exptime: float): + """Trigger the shutter release for given exposure time via the GPIO pin.""" + # Create a timer. + timer = CountdownTimer(exptime, name=f'Pin{app_settings.camera.pin}Expose') + + # Open shutter. + self.update_state(state='START_EXPOSING', start_time=current_time(flatten=True)) + gpio.write(app_settings.camera.pin, State.HIGH) + + # Wait for exptime, send state updates. + while timer.expired() is False: + self.update_state(state='EXPOSING', meta=dict(secs=f'{exptime - timer.time_left():.02f}', )) + timer.sleep(max_sleep=max(1., exptime / 8)) # Divide wait time into eighths. + + # Close shutter. + gpio.write(app_settings.camera.pin, State.LOW) + self.update_state(state='STOP_EXPOSING', stop_time=current_time(flatten=True)) + + +@app.task(name='camera.start_tether', bind=True) +def start_gphoto2_tether(self, filename_pattern: str): + """Start a tether for gphoto2 auto-download.""" + if app_settings.camera.is_tethered: + print(f'{app_settings.camera} is already tethered') + return + else: + print(f'Starting gphoto2 tether for {app_settings.camera.port=} using {filename_pattern=}') + app_settings.camera.is_tethered = True + + command = ['--filename', filename_pattern, '--capture-tethered'] + full_command = _build_gphoto2_command(command) + + # Start tether process. + app_settings.process = subprocess.Popen(full_command, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE) + print(f'gphoto2 tether started for {app_settings.camera} on {app_settings.process.pid=}') + + +@app.task(name='camera.stop_tether') +def stop_gphoto2_tether(): + """Tells camera to stop gphoto2 tether.""" + print(f'Stopping gphoto2 tether for {app_settings.camera}') + # Communicate and kill immediately. + try: + outs, errs = app_settings.process.communicate(timeout=1) + except subprocess.TimeoutExpired: + app_settings.process.kill() + outs, errs = app_settings.process.communicate() + + app_settings.camera.is_tethered = False + + return dict(outs=outs.decode('utf-8'), errs=errs.decode('utf-8')) + + +@app.task(name='camera.file_download', bind=True) +def gphoto_file_download(self, + filename_pattern: str, + only_new: bool = True + ): + """Downloads (newer) files from the camera on the given port using the filename pattern.""" + print(f'Starting gphoto2 download for {app_settings.camera} using {filename_pattern=}') + command = ['--filename', filename_pattern, '--get-all-files', '--recurse'] + if only_new: + command.append('--new') + + results = gphoto2_command(command, timeout=600) + filenames = list() + for line in results['output']: + file_match = file_save_re.match(line) + if file_match is not None: + fn = file_match.group(1).strip() + print(f'Found match {fn}') + filenames.append(fn) + self.update_state(state='DOWNLOADING', meta=dict(directory=fn)) + + return filenames + + +@app.task(name='camera.delete_files', bind=True) +def gphoto_file_delete(self): + """Removes all files from the camera on the given port.""" + print(f'Deleting all files for {app_settings.camera}') + gphoto2_command('--delete-all-files --recurse') + + +@app.task(name='camera.command', bind=True) +def gphoto_task(self, command: Union[List[str], str]): + """Perform arbitrary gphoto2 command..""" + print(f'Calling {command=} on {app_settings.camera}') + return gphoto2_command(command) + + +def gphoto2_command(command: Union[List[str], str], timeout: Optional[float] = 300) -> dict: + """Perform a gphoto2 command.""" + full_command = _build_gphoto2_command(command) + print(f'Running gphoto2 {full_command=}') + + completed_proc = subprocess.run(full_command, capture_output=True, timeout=timeout) + + # Populate return items. + command_output = dict( + success=completed_proc.returncode >= 0, + returncode=completed_proc.returncode, + output=completed_proc.stdout.decode('utf-8').split('\n'), + error=completed_proc.stderr.decode('utf-8').split('\n') + ) + + return command_output + + +def _build_gphoto2_command(command: Union[List[str], str]): + full_command = [shutil.which('gphoto2'), '--port', app_settings.camera.port] + + # Turn command into a list if not one already. + with suppress(AttributeError): + command = command.split(' ') + + full_command.extend(command) + + return full_command diff --git a/src/panoptes/pocs/utils/cli/main.py b/src/panoptes/pocs/utils/cli/main.py index 0edd95991..e752d9d2b 100644 --- a/src/panoptes/pocs/utils/cli/main.py +++ b/src/panoptes/pocs/utils/cli/main.py @@ -3,6 +3,7 @@ from panoptes.pocs.utils.cli import config from panoptes.pocs.utils.cli import sensor from panoptes.pocs.utils.cli import image +from panoptes.pocs.utils.cli import tasks from panoptes.pocs.utils.cli import power app = typer.Typer() @@ -12,6 +13,7 @@ app.add_typer(sensor.app, name="sensor", help='Interact with system sensors.') app.add_typer(power.app, name="power", help='Interact with power relays.') app.add_typer(image.app, name="image", help='Interact with images.') +app.add_typer(tasks.app, name="tasks", help='Interact with the TaskManager.') @app.callback() diff --git a/src/panoptes/pocs/utils/cli/tasks.py b/src/panoptes/pocs/utils/cli/tasks.py new file mode 100644 index 000000000..0606c8277 --- /dev/null +++ b/src/panoptes/pocs/utils/cli/tasks.py @@ -0,0 +1,65 @@ +import shutil +import subprocess + +import typer +from panoptes.utils.config.client import get_config + +from panoptes.pocs.utils.tasks import TaskManager + +app = typer.Typer() + + +@app.command() +def start_backends( + config_key: str = typer.Option('celery', + help='The key to use to look up the celery config.') +): + """Start the celery backends specified by the config.""" + celery_config = get_config(config_key) + typer.echo('Starting celery backends.') + TaskManager.start_celery_backends(celery_config) + + +@app.command() +def stop_backends( + remove: bool = typer.Option(False, + help='If running containers should be removed or just stopped.'), + config_key: str = typer.Option('celery', + help='The key to use to look up the celery config.') + +): + """Stop the running celery backends specified by the config.""" + celery_config = get_config(config_key) + typer.echo('Stopping celery backends.') + TaskManager.stop_celery_backends(celery_config, remove=remove) + + +@app.command() +def start_worker( + worker: str = typer.Option(..., help='The name of the worker to start.' + 'Can be an absolute namespace or the name of a module' + 'in `panoptes.pocs.utils.service`.'), + queue: str = typer.Option(None, + help='The name of the queue to use for the worker,' + 'defaults to class name'), + loglevel: str = typer.Option('INFO', help='The name of a valid log level.'), +): + """Starts a worker thread for the given piece of hardware.""" + queue = queue or str(worker) + + # TODO this could be a better check for module name, perhaps with load_module. + if '.' not in worker: + worker = f'panoptes.pocs.utils.service.{worker}' + + typer.echo(f'Starting celery {worker=} with {queue=}') + + subprocess.run([ + shutil.which('celery'), + '-A', worker, 'worker', + '-Q', queue, + '--loglevel', loglevel + ]) + + +if __name__ == '__main__': + app() diff --git a/src/panoptes/pocs/utils/tasks.py b/src/panoptes/pocs/utils/tasks.py new file mode 100644 index 000000000..facc6643b --- /dev/null +++ b/src/panoptes/pocs/utils/tasks.py @@ -0,0 +1,102 @@ +import docker +import docker.errors +import celery +from loguru import logger +from panoptes.utils.config.client import get_config + + +class TaskManager: + """Simple celery task manager.""" + + @classmethod + def create_celery_app_from_config(cls, config=None, config_key='celery'): + """Create an instance of the class from the config. + + If `config` is `None` (the default) then attempt a lookup in the config + server using the `config_key`. + """ + config = config or get_config(config_key) + if config: + logger.info(f'Creating Celery app with {config=!r}') + celery_app = celery.Celery() + celery_app.config_from_object(config) + print(f'Created {celery_app}') + + return celery_app + + @classmethod + def start_celery_backends(cls, celery_config: dict): + """Use the python docker binders to control required celery backends.""" + docker_client = docker.from_env() + + # Start the messaging and result backend services. + for service_config in celery_config['service']: + service_name = service_config['name'] + service_image = service_config['image'] + + print(f'Starting {service_name} container using {service_image=}') + try: + # Try to start existing container first. + container = docker_client.containers.get(service_name) + container.start() + except docker.errors.NotFound: + print(f'Creating new container for {service_name}') + # Or create a new one. + docker_client.containers.run( + service_image, + ports=service_config.get('ports'), + name=service_name, + detach=True, + ) + print(f'{service_name} started') + except docker.errors.APIError as e: + print(f'{service_name} already running: {e!r}') + + @classmethod + def stop_celery_backends(cls, celery_config: dict, remove: bool = False): + """Stop the docker containers running the celery backends.""" + docker_client = docker.from_env() + + # Stop the messaging and result backends. + # Start the messaging and result backend services. + for service_config in celery_config['service']: + service_name = service_config['name'] + logger.info(f'Stopping {service_name} container') + + try: + container = docker_client.containers.get(service_name) + container.stop() + + if remove: + logger.info(f'Removing {service_name} container') + container.remove() + except docker.errors.APIError: + logger.info(f'{service_name} not running') + + +class RunTaskMixin: + """A mixin class for running celery tasks and getting results.""" + celery_app: celery.Celery + + def call_task(self, name: str = '', **kwargs) -> celery.Task: + """Call a celery task. + + Thin-wrapper around `self.celery.send_task` and all `kwargs` are passed + along as-is. + + If `queue` is not given in `kwargs` the value will be set to the name + of the class (i.e. `queue = self.__class__`). + + Checks for valid `self.celery` object and logs a warning if + unavailable. + """ + queue = kwargs.setdefault('queue', self.__class__) + logger.debug(f'Calling task {name} with {kwargs=!r} on {queue=}') + task = self.celery_app.send_task(name, **kwargs) + logger.debug(f'{name} task started with {task.id=}') + + return task + + def get_task(self, task_id: str) -> celery.Task: + """Get the task via its ID number.""" + return self.celery_app.AsyncResult(task_id)