diff --git a/pyobs/background_task.py b/pyobs/background_task.py index 0ae82f119..dbf153c46 100644 --- a/pyobs/background_task.py +++ b/pyobs/background_task.py @@ -35,5 +35,11 @@ def _callback_function(self, args=None) -> None: log.error("Background task for %s has died, quitting...", self._func.__name__) def stop(self) -> None: - if self._task is not None: + if self.is_running(): self._task.cancel() + + def is_running(self) -> bool: + if self._task is None: + return False + + return self._task.done() diff --git a/pyobs/modules/robotic/_pointingseriesiterator.py b/pyobs/modules/robotic/_pointingseriesiterator.py new file mode 100644 index 000000000..e1feeb08a --- /dev/null +++ b/pyobs/modules/robotic/_pointingseriesiterator.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import logging +import random +from collections.abc import Iterator +from copy import copy +from typing import Tuple, Any, Optional, List, Dict, cast + +import astropy +import astropy.units as u +import pandas as pd +from astroplan import Observer +from astropy.coordinates import SkyCoord + +from pyobs.interfaces import IAcquisition, ITelescope +from pyobs.utils import exceptions as exc +from pyobs.utils.time import Time + +log = logging.getLogger(__name__) + + +class _LoopedRandomIterator(Iterator[Any]): + def __init__(self, data: List[Any]) -> None: + self._data = copy(data) + self._todo = copy(data) + + def __iter__(self) -> _LoopedRandomIterator: + return self + + def __next__(self) -> Any: + if len(self._todo) == 0: + self._todo = copy(self._data) + + item = random.sample(self._todo, 1)[0] + self._todo.remove(item) + + return item + + +class _PointingSeriesIterator: + def __init__(self, + observer: Observer, + dec_range: Tuple[float, float], + finish_percentage: float, + min_moon_dist: float) -> None: + + self._observer = observer + self._telescope: Optional[ITelescope] = None + self._acquisition: Optional[IAcquisition] = None + + self._dec_range = dec_range + self._finish_fraction = 1.0 - finish_percentage / 100.0 + self._min_moon_dist = min_moon_dist + + self._grid_points: pd.DataFrame = pd.DataFrame({"alt": [], "az": [], "done": []}) + + def set_grid_points(self, grid_points: pd.DataFrame) -> None: + self._grid_points = grid_points + + def set_telescope(self, telescope: ITelescope) -> None: + self._telescope = telescope + + def set_acquisition(self, acquisition: IAcquisition) -> None: + self._acquisition = acquisition + + def __aiter__(self) -> _PointingSeriesIterator: + if self._acquisition is None: + raise ValueError("Acquisition is not set.") + + if self._telescope is None: + raise ValueError("Telescope is not set.") + + return self + + async def __anext__(self) -> Optional[Dict[str, Any]]: + if self._is_finished(): + raise StopAsyncIteration() + + todo_coords = self._get_todo_coords() + log.info("Grid points left to do: %d", len(todo_coords)) + + alt, az, radec = self._find_next_point(todo_coords) + + try: + acquisition_result = await self._acquire_target(radec) + except (ValueError, exc.RemoteError): + log.info("Could not acquire target.") + return None + + self._grid_points.loc[alt, az] = True + return acquisition_result + + def _is_finished(self) -> bool: + num_finished_coords: int = sum(self._grid_points["done"].values) + total_num_coords: int = len(self._grid_points) + + return num_finished_coords >= self._finish_fraction * total_num_coords + + def _get_todo_coords(self) -> List[Tuple[float, float]]: + return list(self._grid_points[~self._grid_points["done"]].index) + + def _find_next_point(self, todo_coords: List[Tuple[float, float]]) -> Tuple[float, float, astropy.coordinates.SkyCoord]: # type: ignore + moon = self._observer.moon_altaz(Time.now()) + + for alt, az in _LoopedRandomIterator(todo_coords): + altaz = SkyCoord( + alt=alt * u.deg, az=az * u.deg, frame="altaz", obstime=Time.now(), location=self._observer.location + ) + radec = altaz.transform_to("icrs") + + if self._is_valid_target(altaz, radec, moon): + log.info("Picked grid point at Alt=%.2f, Az=%.2f (%s).", alt, az, radec.to_string("hmsdms")) + return alt, az, radec + + def _is_valid_target(self, altaz_coords: SkyCoord, radec_coords: SkyCoord, moon: SkyCoord) -> bool: + moon_separation_condition: bool = altaz_coords.separation(moon).degree >= self._min_moon_dist + dec_range_condition: bool = self._dec_range[0] <= radec_coords.dec.degree < self._dec_range[1] + + return moon_separation_condition and dec_range_condition + + async def _acquire_target(self, target: SkyCoord) -> dict[str, Any]: + telescope = cast(ITelescope, self._telescope) # Telescope has to be set in order to enter the iteration + await telescope.move_radec(float(target.ra.degree), float(target.dec.degree)) + + acquisition = cast(IAcquisition, self._acquisition) + return await acquisition.acquire_target() # Acquisition has to be set in order to enter the iteration diff --git a/pyobs/modules/robotic/_taskscheduler.py b/pyobs/modules/robotic/_taskscheduler.py new file mode 100644 index 000000000..593260bc6 --- /dev/null +++ b/pyobs/modules/robotic/_taskscheduler.py @@ -0,0 +1,210 @@ +import asyncio +import copy +import logging +import multiprocessing as mp +from typing import List, Tuple, Any, Optional, cast + +import astroplan +import astropy +import astropy.units as u +from astroplan import ObservingBlock, Observer +from astropy.time import TimeDelta + +from pyobs.robotic import TaskSchedule, Task +from pyobs.utils.time import Time + +log = logging.getLogger(__name__) + + +class _TaskScheduler: + def __init__(self, schedule: TaskSchedule, observer: Observer, schedule_range: int, safety_time: int, twilight: str) -> None: + self._schedule = schedule + self._observer = observer + + self._schedule_range = schedule_range + self._safety_time = safety_time + + twilight_constraint = self._get_twilight_constraint(twilight) + self._scheduler = astroplan.PriorityScheduler(twilight_constraint, self._observer, transitioner=astroplan.Transitioner()) + + self._blocks: List[ObservingBlock] = [] + + self._current_task_id: Optional[str] = None + self._schedule_start: Optional[Time] = None + + @staticmethod + def _get_twilight_constraint(twilight: str) -> List[astroplan.AtNightConstraint]: + if twilight == "astronomical": + return [astroplan.AtNightConstraint.twilight_astronomical()] + elif twilight == "nautical": + return [astroplan.AtNightConstraint.twilight_nautical()] + else: + raise ValueError("Unknown twilight type.") + + def set_current_task_id(self, task_id: Optional[str]) -> None: + self._current_task_id = task_id + + def set_schedule_start(self, time: Optional[Time]) -> None: + self._schedule_start = time + + def set_blocks(self, blocks: List[ObservingBlock]) -> None: + self._blocks = blocks + + async def schedule_task(self) -> None: + try: + # prepare scheduler + blocks, start, end = await self._prepare_schedule() + + # schedule + scheduled_blocks = await self._schedule_blocks(blocks, start, end) + + # finish schedule + await self._finish_schedule(scheduled_blocks, start) + + except ValueError as e: + log.warning(str(e)) + + async def _prepare_schedule(self) -> Tuple[List[ObservingBlock], Time, Time]: + """TaskSchedule blocks.""" + + converted_blocks = await self._convert_blocks_to_astroplan() + + start, end = await self._get_time_range() + + blocks = self._filter_blocks(converted_blocks, end) + + # no blocks found? + if len(blocks) == 0: + await self._schedule.set_schedule([], start) + raise ValueError("No blocks left for scheduling.") + + # return all + return blocks, start, end + + async def _convert_blocks_to_astroplan(self) -> List[astroplan.ObservingBlock]: + copied_blocks = [copy.copy(block) for block in self._blocks] + + for block in copied_blocks: + self._invert_block_priority(block) + self._tighten_block_time_constraints(block) + + return copied_blocks + + @staticmethod + def _invert_block_priority(block: astroplan.ObservingBlock) -> None: + """ + astroplan's PriorityScheduler expects lower priorities to be more important, so calculate + 1000 - priority + """ + block.priority = max(1000.0 - block.priority, 0.0) + + @staticmethod + def _tighten_block_time_constraints(block: astroplan.ObservingBlock) -> None: + """ + astroplan's PriorityScheduler doesn't match the requested observing windows exactly, + so we make them a little smaller. + """ + time_constraints = filter(lambda c: isinstance(c, astroplan.TimeConstraint), block.constraints) + for constraint in time_constraints: + constraint.min += 30 * u.second + constraint.max -= 30 * u.second + + async def _get_time_range(self) -> Tuple[astropy.time.Time, astropy.time.Time]: + # get start time for scheduler + start = self._schedule_start + now_plus_safety = Time.now() + self._safety_time * u.second + if start is None or start < now_plus_safety: + # if no ETA exists or is in the past, use safety time + start = now_plus_safety + + if (running_task := await self._get_current_task()) is not None: + log.info("Found running block that ends at %s.", running_task.end) + + # get block end plus some safety + block_end = running_task.end + 10.0 * u.second + if start < block_end: + start = block_end + log.info("Start time would be within currently running block, shifting to %s.", cast(Time, start).isot) + + # calculate end time + end = start + TimeDelta(self._schedule_range * u.hour) + + return start, end + + async def _get_current_task(self) -> Optional[Task]: + if self._current_task_id is None: + log.info("No running block found.") + return None + + log.info("Trying to find running block in current schedule...") + tasks = await self._schedule.get_schedule() + if self._current_task_id in tasks: + return tasks[self._current_task_id] + else: + log.info("Running block not found in last schedule.") + return None + + def _filter_blocks(self, blocks: List[astroplan.ObservingBlock], end: astropy.time.Time) -> List[astroplan.ObservingBlock]: + blocks_without_current = filter(lambda x: x.configuration["request"]["id"] != self._current_task_id, blocks) + blocks_in_schedule_range = filter(lambda b: self._is_block_starting_in_schedule(b, end), blocks_without_current) + + return list(blocks_in_schedule_range) + + @staticmethod + def _is_block_starting_in_schedule(block: astroplan.ObservingBlock, end: astropy.time.Time) -> bool: + time_constraints = [c for c in block.constraints if isinstance(c, astroplan.TimeConstraint)] + + # does constraint start before the end of the scheduling range? + before_end = [c for c in time_constraints if c.min < end] + + return len(time_constraints) == 0 or len(before_end) > 0 + + async def _schedule_blocks(self, blocks: List[ObservingBlock], start: Time, end: Time) -> List[ObservingBlock]: + + # run actual scheduler in separate process and wait for it + qout: mp.Queue[List[ObservingBlock]] = mp.Queue() + p = mp.Process(target=self._schedule_process, args=(blocks, start, end, qout)) + p.start() + + # wait for process to finish + # note that the process only finishes, when the queue is empty! so we have to poll the queue first + # and then the process. + loop = asyncio.get_running_loop() + scheduled_blocks: List[ObservingBlock] = await loop.run_in_executor(None, qout.get, True) + await loop.run_in_executor(None, p.join) + return scheduled_blocks + + async def _finish_schedule(self, scheduled_blocks: List[ObservingBlock], start: Time) -> None: + # update + await self._schedule.set_schedule(scheduled_blocks, start) + if len(scheduled_blocks) > 0: + log.info("Finished calculating schedule for %d block(s):", len(scheduled_blocks)) + for i, block in enumerate(scheduled_blocks, 1): + log.info( + " #%d: %s to %s (%.1f)", + block.configuration["request"]["id"], + block.start_time.strftime("%H:%M:%S"), + block.end_time.strftime("%H:%M:%S"), + block.priority, + ) + else: + log.info("Finished calculating schedule for 0 blocks.") + + def _schedule_process( + self, + blocks: List[ObservingBlock], + start: Time, + end: Time, + scheduled_blocks: mp.Queue # type: ignore + ) -> None: + """Actually do the scheduling, usually run in a separate process.""" + + # log it + log.info("Calculating schedule for %d schedulable block(s) starting at %s...", len(blocks), start) + + # run scheduler + time_range = astroplan.Schedule(start, end) + schedule = self._scheduler(blocks, time_range) + + # put scheduled blocks in queue + scheduled_blocks.put(schedule.scheduled_blocks) \ No newline at end of file diff --git a/pyobs/modules/robotic/_taskupdater.py b/pyobs/modules/robotic/_taskupdater.py new file mode 100644 index 000000000..206282c24 --- /dev/null +++ b/pyobs/modules/robotic/_taskupdater.py @@ -0,0 +1,126 @@ +import json +import json +import logging +from typing import List, Tuple, Optional + +from astroplan import ObservingBlock + +from pyobs.robotic import TaskArchive, TaskSchedule +from pyobs.utils.time import Time + +log = logging.getLogger(__name__) + + +class _TaskUpdater: + def __init__(self, task_archive: TaskArchive, task_schedule: TaskSchedule): + self._task_archive = task_archive + self._schedule = task_schedule + + self._blocks: List[ObservingBlock] = [] + + self._current_task_id: Optional[str] = None + self._last_task_id: Optional[str] = None + + self._last_change: Optional[Time] = None + + def set_current_task_id(self, task_id: Optional[str]) -> None: + self._current_task_id = task_id + + def set_last_task_id(self, task_id: str) -> None: + self._last_task_id = task_id + + async def update(self) -> Optional[List[ObservingBlock]]: + # got new time of last change? + t = await self._task_archive.last_changed() + if self._last_change is None or self._last_change < t: + blocks = await self._update_blocks() + + self._last_change = Time.now() + return blocks + + return None + + async def _update_blocks(self) -> Optional[List[ObservingBlock]]: + # get schedulable blocks and sort them + log.info("Found update in schedulable block, downloading them...") + blocks = sorted( + await self._task_archive.get_schedulable_blocks(), + key=lambda x: json.dumps(x.configuration, sort_keys=True), + ) + log.info("Downloaded %d schedulable block(s).", len(blocks)) + + # compare new and old lists + removed, added = self._compare_block_lists(self._blocks, blocks) + + # store blocks + self._blocks = blocks + + + # schedule update + if await self._need_update(removed, added): + log.info("Triggering scheduler run...") + return blocks + #self._scheduler_task.stop() # Stop current run + #self._scheduler_task.start() + + return None + + async def _need_update(self, removed: List[ObservingBlock], added: List[ObservingBlock]) -> bool: + if len(removed) == 0 and len(added) == 0: + # no need to re-schedule + log.info("No change in list of blocks detected.") + return False + + # has only the current block been removed? + log.info("Removed: %d, added: %d", len(removed), len(added)) + if len(removed) == 1: + log.info( + "Found 1 removed block with ID %d. Last task ID was %s, current is %s.", + removed[0].target.name, + str(self._last_task_id), + str(self._current_task_id), + ) + if len(removed) == 1 and len(added) == 0 and removed[0].target.name == self._last_task_id: + # no need to re-schedule + log.info("Only one removed block detected, which is the one currently running.") + return False + + # check, if one of the removed blocks was actually in schedule + if len(removed) > 0: + schedule = await self._schedule.get_schedule() + removed_from_schedule = [r for r in removed if r in schedule] + if len(removed_from_schedule) == 0: + log.info(f"Found {len(removed)} blocks, but none of them was scheduled.") + return False + + return True + + @staticmethod + def _compare_block_lists( + blocks1: List[ObservingBlock], blocks2: List[ObservingBlock] + ) -> Tuple[List[ObservingBlock], List[ObservingBlock]]: + """Compares two lists of ObservingBlocks and returns two lists, containing those that are missing in list 1 + and list 2, respectively. + + Args: + blocks1: First list of blocks. + blocks2: Second list of blocks. + + Returns: + (tuple): Tuple containing: + unique1: Blocks that exist in blocks1, but not in blocks2. + unique2: Blocks that exist in blocks2, but not in blocks1. + """ + + # get dictionaries with block names + names1 = {b.target.name: b for b in blocks1} + names2 = {b.target.name: b for b in blocks2} + + # find elements in names1 that are missing in names2 and vice versa + additional1 = set(names1.keys()).difference(names2.keys()) + additional2 = set(names2.keys()).difference(names1.keys()) + + # get blocks for names and return them + unique1 = [names1[n] for n in additional1] + unique2 = [names2[n] for n in additional2] + return unique1, unique2 \ No newline at end of file diff --git a/pyobs/modules/robotic/mastermind.py b/pyobs/modules/robotic/mastermind.py index 21a4d6015..b03c5425a 100644 --- a/pyobs/modules/robotic/mastermind.py +++ b/pyobs/modules/robotic/mastermind.py @@ -20,12 +20,12 @@ class Mastermind(Module, IAutonomous, IFitsHeaderBefore): __module__ = "pyobs.modules.robotic" def __init__( - self, - schedule: Union[TaskSchedule, Dict[str, Any]], - runner: Union[TaskRunner, Dict[str, Any]], - allowed_late_start: int = 300, - allowed_overrun: int = 300, - **kwargs: Any, + self, + schedule: Union[TaskSchedule, Dict[str, Any]], + runner: Union[TaskRunner, Dict[str, Any]], + allowed_late_start: int = 300, + allowed_overrun: int = 300, + **kwargs: Any, ): """Initialize a new auto focus system. @@ -36,122 +36,98 @@ def __init__( """ Module.__init__(self, **kwargs) - # store self._allowed_late_start = allowed_late_start self._allowed_overrun = allowed_overrun - self._running = False - # add thread func - self.add_background_task(self._run_thread, True) + self._mastermind_loop = self.add_background_task(self._run_thread, True, True) - # get schedule and runner self._task_schedule = self.add_child_object(schedule, TaskSchedule) self._task_runner = self.add_child_object(runner, TaskRunner) - # observation name and exposure number - self._task = None - self._obs = None - self._exp = None + self._task: Optional[Task] = None + + self._first_late_start_warning = True async def open(self) -> None: """Open module.""" await Module.open(self) - # subscribe to events - if self.comm: - await self.comm.register_event(TaskStartedEvent) - await self.comm.register_event(TaskFinishedEvent) - - # start - self._running = True + await self.comm.register_event(TaskStartedEvent) + await self.comm.register_event(TaskFinishedEvent) async def start(self, **kwargs: Any) -> None: """Starts a service.""" log.info("Starting robotic system...") - self._running = True + self._mastermind_loop.start() async def stop(self, **kwargs: Any) -> None: """Stops a service.""" log.info("Stopping robotic system...") - self._running = False + self._mastermind_loop.stop() async def is_running(self, **kwargs: Any) -> bool: """Whether a service is running.""" - return self._running + return self._mastermind_loop.is_running() async def _run_thread(self) -> None: - # wait a little await asyncio.sleep(1) - # flags - first_late_start_warning = True - - # run until closed while True: - # not running? - if not self._running: - # sleep a little and continue - await asyncio.sleep(1) - continue - - # get now - now = Time.now() - - # find task that we want to run now - task: Optional[Task] = await self._task_schedule.get_task(now) - if task is None or not await self._task_runner.can_run(task): - # no task found - await asyncio.sleep(10) - continue - - # starting too late? - if not task.can_start_late: - late_start = now - task.start - if late_start > self._allowed_late_start * u.second: - # only warn once - if first_late_start_warning: - log.warning( - "Time since start of window (%.1f) too long (>%.1f), skipping task...", - late_start.to_value("second"), - self._allowed_late_start, - ) - first_late_start_warning = False - - # sleep a little and skip - await asyncio.sleep(10) - continue - - # reset warning - first_late_start_warning = True - - # task is definitely not None here - self._task = cast(Task, task) - - # ETA - eta = now + self._task.duration * u.second - - # send event - await self.comm.send_event(TaskStartedEvent(name=self._task.name, id=self._task.id, eta=eta)) - - # run task in thread - log.info("Running task %s...", self._task.name) - try: - await self._task_runner.run_task(self._task, task_schedule=self._task_schedule) - except: - # something went wrong - log.warning("Task %s failed.", self._task.name) - self._task = None - continue - - # send event - await self.comm.send_event(TaskFinishedEvent(name=self._task.name, id=self._task.id)) - - # finish - log.info("Finished task %s.", self._task.name) - self._task = None + await self._loop() + + async def _loop(self): + now = Time.now() + + self._task = await self._task_schedule.get_task(now) + + if self._task is None or not await self._task_runner.can_run(self._task): + await asyncio.sleep(10) + return + + if not self._task.can_start_late and self._check_is_task_late(now): + await asyncio.sleep(10) + return + + await self._execute_task(now) + + self._remove_task() + + def _check_is_task_late(self, now: Time) -> bool: + time_since_planned_start = now - self._task.start + is_late_start = time_since_planned_start > self._allowed_late_start * u.second + + if is_late_start and self._first_late_start_warning: + log.warning( + "Time since start of window (%.1f) too long (>%.1f), skipping task...", + time_since_planned_start.to_value("second"), + self._allowed_late_start, + ) + self._first_late_start_warning = False + else: + self._first_late_start_warning = True + + return is_late_start + + async def _execute_task(self, now: Time) -> None: + eta = now + self._task.duration * u.second + await self.comm.send_event(TaskStartedEvent(name=self._task.name, id=self._task.id, eta=eta)) + + log.info("Running task %s...", self._task.name) + try: + await self._task_runner.run_task(self._task, task_schedule=self._task_schedule) + except: + log.warning("Task %s failed.", self._task.name) + return + + await self.comm.send_event(TaskFinishedEvent(name=self._task.name, id=self._task.id)) + + log.info("Finished task %s.", self._task.name) + + def _remove_task(self) -> None: + self._task = None async def get_fits_header_before( - self, namespaces: Optional[List[str]] = None, **kwargs: Any + self, namespaces: Optional[List[str]] = None, **kwargs: Any ) -> Dict[str, Tuple[Any, str]]: """Returns FITS header for the current status of this module. diff --git a/pyobs/modules/robotic/pointing.py b/pyobs/modules/robotic/pointing.py index 7c8f47100..09eb5381f 100644 --- a/pyobs/modules/robotic/pointing.py +++ b/pyobs/modules/robotic/pointing.py @@ -1,17 +1,13 @@ import logging -import random from typing import Tuple, Any, Optional, List, Dict import numpy as np import pandas as pd -from astropy.coordinates import SkyCoord -import astropy.units as u from pyobs.interfaces import IAcquisition, ITelescope -from pyobs.modules import Module -from pyobs.utils import exceptions as exc from pyobs.interfaces import IAutonomous -from pyobs.utils.time import Time +from pyobs.modules import Module +from pyobs.modules.robotic._pointingseriesiterator import _PointingSeriesIterator log = logging.getLogger(__name__) @@ -22,18 +18,18 @@ class PointingSeries(Module, IAutonomous): __module__ = "pyobs.modules.robotic" def __init__( - self, - alt_range: Tuple[float, float] = (30.0, 85.0), - num_alt: int = 8, - az_range: Tuple[float, float] = (0.0, 360.0), - num_az: int = 24, - dec_range: Tuple[float, float] = (-80.0, 80.0), - min_moon_dist: float = 15.0, - finish: int = 90, - exp_time: float = 1.0, - acquisition: str = "acquisition", - telescope: str = "telescope", - **kwargs: Any, + self, + alt_range: Tuple[float, float] = (30.0, 85.0), + num_alt: int = 8, + az_range: Tuple[float, float] = (0.0, 360.0), + num_az: int = 24, + dec_range: Tuple[float, float] = (-80.0, 80.0), + min_moon_dist: float = 15.0, + finish: int = 90, + exp_time: float = 1.0, + acquisition: str = "acquisition", + telescope: str = "telescope", + **kwargs: Any, ): """Initialize a new auto focus system. @@ -56,13 +52,19 @@ def __init__( self._num_alt = num_alt self._az_range = tuple(az_range) self._num_az = num_az - self._dec_range = dec_range - self._min_moon_dist = min_moon_dist - self._finish = 1.0 - finish / 100.0 - self._exp_time = exp_time + self._acquisition = acquisition self._telescope = telescope + if self.observer is None: + raise ValueError("No observer given.") + + self._pointing_series_iterator = _PointingSeriesIterator( + self.observer, dec_range, + finish_percentage=finish, + min_moon_dist=min_moon_dist + ) + # if Az range is [0, 360], we got north double, so remove one step if self._az_range == (0.0, 360.0): self._az_range = (0.0, 360.0 - 360.0 / self._num_az) @@ -72,19 +74,37 @@ def __init__( async def start(self, **kwargs: Any) -> None: """Starts a service.""" - pass async def stop(self, **kwargs: Any) -> None: """Stops a service.""" - pass async def is_running(self, **kwargs: Any) -> bool: """Whether a service is running.""" return True + async def open(self, **kwargs: Any) -> None: + await Module.open(self) + + acquisition = await self.proxy(self._acquisition, IAcquisition) + telescope = await self.proxy(self._telescope, ITelescope) + + self._pointing_series_iterator.set_acquisition(acquisition) + self._pointing_series_iterator.set_telescope(telescope) + async def _run_thread(self) -> None: """Run a pointing series.""" + pd_grid = self._generate_grid() + self._pointing_series_iterator.set_grid_points(pd_grid) + + async for acquisition_result in self._pointing_series_iterator: + if acquisition_result is not None: + await self._process_acquisition(**acquisition_result) + + # finished + log.info("Pointing series finished.") + + def _generate_grid(self) -> pd.DataFrame: # create grid grid: Dict[str, List[Any]] = {"alt": [], "az": [], "done": []} for az in np.linspace(self._az_range[0], self._az_range[1], self._num_az): @@ -92,92 +112,22 @@ async def _run_thread(self) -> None: grid["alt"] += [alt] grid["az"] += [az] grid["done"] += [False] - # to dataframe pd_grid = pd.DataFrame(grid).set_index(["alt", "az"]) - # get acquisition and telescope units - acquisition = await self.proxy(self._acquisition, IAcquisition) - telescope = await self.proxy(self._telescope, ITelescope) - - # check observer - if self.observer is None: - raise ValueError("No observer given.") - - # loop until finished - while True: - # get all entries without offset measurements - todo = list(pd_grid[~pd_grid["done"]].index) - if len(todo) / len(pd_grid) < self._finish: - log.info("Finished.") - break - log.info("Grid points left to do: %d", len(todo)) - - # get moon - moon = self.observer.moon_altaz(Time.now()) - - # try to find a good point - while True: - # pick a random index and remove from list - alt, az = random.sample(todo, 1)[0] - todo.remove((alt, az)) - altaz = SkyCoord( - alt=alt * u.deg, az=az * u.deg, frame="altaz", obstime=Time.now(), location=self.observer.location - ) - - # get RA/Dec - radec = altaz.icrs - - # moon far enough away? - if altaz.separation(moon).degree >= self._min_moon_dist: - # yep, are we in declination range? - if self._dec_range[0] <= radec.dec.degree < self._dec_range[1]: - # yep, break here, we found our target - break - - # to do list empty? - if len(todo) == 0: - # could not find a grid point - log.info("Could not find a suitable grid point, resetting todo list for next entry...") - todo = list(pd_grid.index) - continue - - # log finding - log.info("Picked grid point at Alt=%.2f, Az=%.2f (%s).", alt, az, radec.to_string("hmsdms")) - - # acquire target and process result - try: - # move telescope - await telescope.move_radec(float(radec.ra.degree), float(radec.dec.degree)) - - # acquire target - acq = await acquisition.acquire_target() - - # process result - if acq is not None: - await self._process_acquisition(**acq) - - except (ValueError, exc.RemoteError): - log.info("Could not acquire target.") - continue - - # finished - pd_grid.loc[alt, az] = True - - # finished - log.info("Pointing series finished.") + return pd_grid async def _process_acquisition( - self, - datetime: str, - ra: float, - dec: float, - alt: float, - az: float, - off_ra: Optional[float] = None, - off_dec: Optional[float] = None, - off_alt: Optional[float] = None, - off_az: Optional[float] = None, + self, + datetime: str, + ra: float, + dec: float, + alt: float, + az: float, + off_ra: Optional[float] = None, + off_dec: Optional[float] = None, + off_alt: Optional[float] = None, + off_az: Optional[float] = None, ) -> None: """Process the result of the acquisition. Either ra_off/dec_off or alt_off/az_off must be given. @@ -192,7 +142,6 @@ async def _process_acquisition( off_alt: Found Alt offset. off_az: Found Az offset. """ - pass __all__ = ["PointingSeries"] diff --git a/pyobs/modules/robotic/scheduler.py b/pyobs/modules/robotic/scheduler.py index 2807d5d8f..9f23f84b9 100644 --- a/pyobs/modules/robotic/scheduler.py +++ b/pyobs/modules/robotic/scheduler.py @@ -3,8 +3,9 @@ import json import logging import multiprocessing as mp -from typing import Union, List, Tuple, Any, Optional, Dict +from typing import Union, List, Tuple, Any, Optional, Dict, cast import astroplan +import astropy from astroplan import ObservingBlock from astropy.time import TimeDelta import astropy.units as u @@ -12,11 +13,12 @@ from pyobs.events.taskfinished import TaskFinishedEvent from pyobs.events.taskstarted import TaskStartedEvent from pyobs.events import GoodWeatherEvent, Event +from pyobs.modules.robotic._taskscheduler import _TaskScheduler +from pyobs.modules.robotic._taskupdater import _TaskUpdater from pyobs.utils.time import Time from pyobs.interfaces import IStartStop, IRunnable from pyobs.modules import Module -from pyobs.robotic import TaskArchive, TaskSchedule - +from pyobs.robotic import TaskArchive, TaskSchedule, Task log = logging.getLogger(__name__) @@ -52,32 +54,35 @@ def __init__( Module.__init__(self, **kwargs) # get scheduler - self._task_archive = self.add_child_object(tasks, TaskArchive) - self._schedule = self.add_child_object(schedule, TaskSchedule) + self._task_archive = self.add_child_object(tasks, TaskArchive) # type: ignore + self._schedule = self.add_child_object(schedule, TaskSchedule) # type: ignore # store - self._schedule_range = schedule_range - self._safety_time = safety_time - self._twilight = twilight - self._running = True - self._initial_update_done = False - self._need_update = False + #self._schedule_range = schedule_range + #self._safety_time = safety_time + #self._twilight = twilight + self._trigger_on_task_started = trigger_on_task_started self._trigger_on_task_finished = trigger_on_task_finished # time to start next schedule from - self._schedule_start: Optional[Time] = None + #self._schedule_start: Optional[Time] = None # ID of currently running task, and current (or last if finished) block - self._current_task_id = None - self._last_task_id = None + #self._current_task_id = None + #self._last_task_id = None # blocks self._blocks: List[ObservingBlock] = [] + self._task_updater = _TaskUpdater(self._task_archive, self._schedule) + self._task_scheduler = _TaskScheduler(self._schedule, self.observer, schedule_range, safety_time, twilight) + # update thread - self.add_background_task(self._schedule_worker) - self.add_background_task(self._update_worker) + self._scheduler_task = self.add_background_task(self._task_scheduler.schedule_task, autostart=False, restart=False) + self._update_task = self.add_background_task(self._update_worker) + + self._last_change: Optional[Time] = None async def open(self) -> None: """Open module.""" @@ -91,300 +96,35 @@ async def open(self) -> None: async def start(self, **kwargs: Any) -> None: """Start scheduler.""" - self._running = True + self._update_task.start() async def stop(self, **kwargs: Any) -> None: """Stop scheduler.""" - self._running = False + self._update_task.stop() async def is_running(self, **kwargs: Any) -> bool: """Whether scheduler is running.""" - return self._running + return self._update_task.is_running() async def _update_worker(self) -> None: - # time of last change in blocks - last_change = None - - # run forever while True: - # not running? - if self._running is False: - await asyncio.sleep(1) - continue - - # got new time of last change? - t = await self._task_archive.last_changed() - if last_change is None or last_change < t: - # get schedulable blocks and sort them - log.info("Found update in schedulable block, downloading them...") - blocks = sorted( - await self._task_archive.get_schedulable_blocks(), - key=lambda x: json.dumps(x.configuration, sort_keys=True), - ) - log.info("Downloaded %d schedulable block(s).", len(blocks)) - - # compare new and old lists - removed, added = self._compare_block_lists(self._blocks, blocks) - - # schedule update - self._need_update = True - - # no changes? - if len(removed) == 0 and len(added) == 0: - # no need to re-schedule - log.info("No change in list of blocks detected.") - self._need_update = False - - # has only the current block been removed? - log.info("Removed: %d, added: %d", len(removed), len(added)) - if len(removed) == 1: - log.info( - "Found 1 removed block with ID %d. Last task ID was %s, current is %s.", - removed[0].target.name, - str(self._last_task_id), - str(self._current_task_id), - ) - if len(removed) == 1 and len(added) == 0 and removed[0].target.name == self._last_task_id: - # no need to re-schedule - log.info("Only one removed block detected, which is the one currently running.") - self._need_update = False - - # check, if one of the removed blocks was actually in schedule - if len(removed) > 0 and self._need_update: - schedule = await self._schedule.get_schedule() - removed_from_schedule = [r for r in removed if r in schedule] - if len(removed_from_schedule) == 0: - log.info(f"Found {len(removed)} blocks, but none of them was scheduled.") - self._need_update = False - - # store blocks - self._blocks = blocks - - # schedule update - if self._need_update: - log.info("Triggering scheduler run...") - - # remember now - last_change = Time.now() - self._initial_update_done = True - - # sleep a little + await self._update_worker_loop() await asyncio.sleep(5) - @staticmethod - def _compare_block_lists( - blocks1: List[ObservingBlock], blocks2: List[ObservingBlock] - ) -> Tuple[List[ObservingBlock], List[ObservingBlock]]: - """Compares two lists of ObservingBlocks and returns two lists, containing those that are missing in list 1 - and list 2, respectively. - - Args: - blocks1: First list of blocks. - blocks2: Second list of blocks. - - Returns: - (tuple): Tuple containing: - unique1: Blocks that exist in blocks1, but not in blocks2. - unique2: Blocks that exist in blocks2, but not in blocks1. - """ - - # get dictionaries with block names - names1 = {b.target.name: b for b in blocks1} - names2 = {b.target.name: b for b in blocks2} + async def _update_worker_loop(self) -> None: + blocks = await self._task_updater.update() - # find elements in names1 that are missing in names2 and vice versa - additional1 = set(names1.keys()).difference(names2.keys()) - additional2 = set(names2.keys()).difference(names1.keys()) - - # get blocks for names and return them - unique1 = [names1[n] for n in additional1] - unique2 = [names2[n] for n in additional2] - return unique1, unique2 - - async def _schedule_worker(self) -> None: - # run forever - while True: - # need update? - if self._need_update and self._initial_update_done: - # reset need for update - self._need_update = False - - try: - # prepare scheduler - blocks, start, end, constraints = await self._prepare_schedule() - - # schedule - scheduled_blocks = await self._schedule_blocks(blocks, start, end, constraints) - - # finish schedule - await self._finish_schedule(scheduled_blocks, start) - - except ValueError as e: - log.warning(str(e)) - - # sleep a little - await asyncio.sleep(1) - - async def _prepare_schedule(self) -> Tuple[List[ObservingBlock], Time, Time, List[Any]]: - """TaskSchedule blocks.""" - - # only global constraint is the night - if self._twilight == "astronomical": - constraints = [astroplan.AtNightConstraint.twilight_astronomical()] - elif self._twilight == "nautical": - constraints = [astroplan.AtNightConstraint.twilight_nautical()] - else: - raise ValueError("Unknown twilight type.") - - # make shallow copies of all blocks and loop them - copied_blocks = [copy.copy(block) for block in self._blocks] - for block in copied_blocks: - # astroplan's PriorityScheduler expects lower priorities to be more important, so calculate - # 1000 - priority - block.priority = 1000.0 - block.priority - if block.priority < 0: - block.priority = 0 - - # it also doesn't match the requested observing windows exactly, so we make them a little smaller. - for constraint in block.constraints: - if isinstance(constraint, astroplan.TimeConstraint): - constraint.min += 30 * u.second - constraint.max -= 30 * u.second - - # get start time for scheduler - start = self._schedule_start - now_plus_safety = Time.now() + self._safety_time * u.second - if start is None or start < now_plus_safety: - # if no ETA exists or is in the past, use safety time - start = now_plus_safety - - # get running scheduled block, if any - if self._current_task_id is None: - log.info("No running block found.") - running_task = None - else: - # get running task from archive - log.info("Trying to find running block in current schedule...") - tasks = await self._schedule.get_schedule() - if self._current_task_id in tasks: - running_task = tasks[self._current_task_id] - else: - log.info("Running block not found in last schedule.") - running_task = None - - # if start is before end time of currently running block, change that - if running_task is not None: - log.info("Found running block that ends at %s.", running_task.end) - - # get block end plus some safety - block_end = running_task.end + 10.0 * u.second - if start < block_end: - start = block_end - log.info("Start time would be within currently running block, shifting to %s.", start.isot) - - # calculate end time - end = start + TimeDelta(self._schedule_range * u.hour) - - # remove currently running block and filter by start time - blocks: List[ObservingBlock] = [] - for b in filter(lambda x: x.configuration["request"]["id"] != self._current_task_id, copied_blocks): - time_constraint_found = False - # loop all constraints - for c in b.constraints: - if isinstance(c, astroplan.TimeConstraint): - # we found a time constraint - time_constraint_found = True - - # does the window start before the end of the scheduling range? - if c.min < end: - # yes, store block and break loop - blocks.append(b) - break - else: - # loop has finished without breaking - # if no time constraint has been found, we still take the block - if time_constraint_found is False: - blocks.append(b) - - # if need new update, skip here - if self._need_update: - raise ValueError("Not running scheduler, since update was requested.") - - # no blocks found? - if len(blocks) == 0: - await self._schedule.set_schedule([], start) - raise ValueError("No blocks left for scheduling.") - - # return all - return blocks, start, end, constraints - - async def _schedule_blocks( - self, blocks: List[ObservingBlock], start: Time, end: Time, constraints: List[Any] - ) -> List[ObservingBlock]: - - # run actual scheduler in separate process and wait for it - qout: mp.Queue = mp.Queue() - p = mp.Process(target=self._schedule_process, args=(blocks, start, end, constraints, qout)) - p.start() - - # wait for process to finish - # note that the process only finishes, when the queue is empty! so we have to poll the queue first - # and then the process. - loop = asyncio.get_running_loop() - scheduled_blocks: List[ObservingBlock] = await loop.run_in_executor(None, qout.get, True) - await loop.run_in_executor(None, p.join) - return scheduled_blocks - - async def _finish_schedule(self, scheduled_blocks: List[ObservingBlock], start: Time) -> None: - # if need new update, skip here - if self._need_update: - log.info("Not using scheduler results, since update was requested.") + if blocks is None: return - # update - await self._schedule.set_schedule(scheduled_blocks, start) - if len(scheduled_blocks) > 0: - log.info("Finished calculating schedule for %d block(s):", len(scheduled_blocks)) - for i, block in enumerate(scheduled_blocks, 1): - log.info( - " #%d: %s to %s (%.1f)", - block.configuration["request"]["id"], - block.start_time.strftime("%H:%M:%S"), - block.end_time.strftime("%H:%M:%S"), - block.priority, - ) - else: - log.info("Finished calculating schedule for 0 blocks.") - - def _schedule_process( - self, - blocks: List[ObservingBlock], - start: Time, - end: Time, - constraints: List[Any], - scheduled_blocks: mp.Queue, - ) -> None: - """Actually do the scheduling, usually run in a separate process.""" - - # log it - log.info("Calculating schedule for %d schedulable block(s) starting at %s...", len(blocks), start) - - # we don't need any transitions - transitioner = astroplan.Transitioner() - - # create scheduler - scheduler = astroplan.PriorityScheduler(constraints, self.observer, transitioner=transitioner) - - # run scheduler - time_range = astroplan.Schedule(start, end) - schedule = scheduler(blocks, time_range) - - # put scheduled blocks in queue - scheduled_blocks.put(schedule.scheduled_blocks) + self._scheduler_task.stop() + self._task_scheduler.set_blocks(blocks) + self._scheduler_task.start() async def run(self, **kwargs: Any) -> None: """Trigger a re-schedule.""" - self._need_update = True + self._scheduler_task.stop() + self._scheduler_task.start() async def _on_task_started(self, event: Event, sender: str) -> bool: """Re-schedule when task has started and we can predict its end. @@ -397,8 +137,9 @@ async def _on_task_started(self, event: Event, sender: str) -> bool: return False # store it - self._current_task_id = event.id - self._last_task_id = event.id + self._task_scheduler.set_current_task_id(event.id) + self._task_updater.set_current_task_id(event.id) + self._task_updater.set_last_task_id(event.id) # trigger? if self._trigger_on_task_started: @@ -406,9 +147,10 @@ async def _on_task_started(self, event: Event, sender: str) -> bool: eta = (event.eta - Time.now()).sec / 60 log.info("Received task started event with ETA of %.0f minutes, triggering new scheduler run...", eta) - # set it - self._need_update = True + self._scheduler_task.stop() + self._scheduler_task.start() self._schedule_start = event.eta + self._task_scheduler.set_schedule_start(event.eta) return True @@ -423,16 +165,18 @@ async def _on_task_finished(self, event: Event, sender: str) -> bool: return False # reset current task - self._current_task_id = None + self._task_scheduler.set_current_task_id(None) + self._task_updater.set_current_task_id(None) # trigger? if self._trigger_on_task_finished: # get ETA in minutes log.info("Received task finished event, triggering new scheduler run...") - # set it - self._need_update = True + self._scheduler_task.stop() + self._scheduler_task.start() self._schedule_start = Time.now() + self._task_scheduler.set_schedule_start(Time.now()) return True @@ -450,9 +194,10 @@ async def _on_good_weather(self, event: Event, sender: str) -> bool: eta = (event.eta - Time.now()).sec / 60 log.info("Received good weather event with ETA of %.0f minutes, triggering new scheduler run...", eta) - # set it - self._need_update = True + self._scheduler_task.stop() + self._scheduler_task.start() self._schedule_start = event.eta + self._task_scheduler.set_schedule_start(event.eta) return True async def abort(self, **kwargs: Any) -> None: diff --git a/tests/modules/robotic/conftest.py b/tests/modules/robotic/conftest.py new file mode 100644 index 000000000..d8ce28d9a --- /dev/null +++ b/tests/modules/robotic/conftest.py @@ -0,0 +1,35 @@ +from typing import Any, Dict, List + +import pytest +from astroplan import Observer, ObservingBlock, FixedTarget +import astropy.units as u +from astropy.coordinates import SkyCoord + + +class MockAcquisition: + async def acquire_target(self, **kwargs: Any) -> Dict[str, Any]: + return {"datetime": "", "ra": 0.0, "dec": 0.0, "az": 0.0, "alt": 0.0} + + +class MockTelescope: + async def move_radec(*args: Any, **kwargs: Any) -> None: + pass + + +@pytest.fixture(scope='module') +def observer() -> Observer: + return Observer(longitude=20.8108 * u.deg, latitude=-32.375823 * u.deg, + elevation=1798.0 * u.m, timezone="UTC") + + +@pytest.fixture(scope='module') +def schedule_blocks() -> List[ObservingBlock]: + blocks = [ + ObservingBlock( + FixedTarget(SkyCoord(0.0 * u.deg, 0.0 * u.deg, frame="icrs"), name=str(i)), 10 * u.minute, 10, + constraints=[], configuration={"request": {"id": str(i)}} + ) + for i in range(10) + ] + + return blocks diff --git a/tests/modules/robotic/test_loopedrandomiterator.py b/tests/modules/robotic/test_loopedrandomiterator.py new file mode 100644 index 000000000..87903bd33 --- /dev/null +++ b/tests/modules/robotic/test_loopedrandomiterator.py @@ -0,0 +1,10 @@ +from pyobs.modules.robotic._pointingseriesiterator import _LoopedRandomIterator + + +def test_iter() -> None: + data = [1, 2] + + iterator = _LoopedRandomIterator(data) + + assert set(data) == {next(iterator), next(iterator)} # Check first cycle + assert set(data) == {next(iterator), next(iterator)} # Check second cycle diff --git a/tests/modules/robotic/test_mastermind.py b/tests/modules/robotic/test_mastermind.py new file mode 100644 index 000000000..5c67ec152 --- /dev/null +++ b/tests/modules/robotic/test_mastermind.py @@ -0,0 +1,234 @@ +import asyncio +import logging +from datetime import timedelta +from typing import Optional, Dict, List, Any, Tuple +from unittest.mock import AsyncMock, Mock + +import pytest +from astroplan import ObservingBlock + +import pyobs +from pyobs.events import TaskStartedEvent, TaskFinishedEvent +from pyobs.modules.robotic import Mastermind +from pyobs.robotic import TaskSchedule, TaskRunner, Task, TaskArchive +from pyobs.robotic.scripts import Script +from pyobs.utils.time import Time + + +class TestTaskScheduler(TaskSchedule): + async def set_schedule(self, blocks: List[ObservingBlock], start_time: Time) -> None: + pass + + async def last_scheduled(self) -> Optional[Time]: + pass + + async def get_schedule(self) -> Dict[str, Task]: + pass + + async def get_task(self, time: Time) -> Optional[Task]: + pass + + +class TestTask(Task): + def __init__(self, start: Time = None, can_start_late: bool = False, **kwargs: Any): + super().__init__(**kwargs) + + if start is None: + self._start = Time.now() + else: + self._start = start + + self._can_start_late = can_start_late + + @property + def id(self) -> Any: + return 0 + + @property + def name(self) -> str: + return "Task" + + @property + def duration(self) -> float: + return 0 + + @property + def start(self) -> Time: + return self._start + + @property + def end(self) -> Time: + return self._start + + async def can_run(self, scripts: Optional[Dict[str, Script]] = None) -> bool: + pass + + @property + def can_start_late(self) -> bool: + return self._can_start_late + + async def run(self, task_runner: TaskRunner, task_schedule: Optional[TaskSchedule] = None, + task_archive: Optional[TaskArchive] = None, scripts: Optional[Dict[str, Script]] = None) -> None: + pass + + def is_finished(self) -> bool: + pass + + def get_fits_headers(self, namespaces: Optional[List[str]] = None) -> Dict[str, Tuple[Any, str]]: + return {"TASK-HDR": (0, "")} + + +@pytest.mark.asyncio +async def test_open(mocker): + mocker.patch("pyobs.modules.Module.open") + master = Mastermind(TestTaskScheduler(), TaskRunner()) + + master.comm.register_event = AsyncMock() + + await master.open() + + pyobs.modules.Module.open.assert_called_once() + assert master.comm.register_event.call_args_list[0][0][0] == TaskStartedEvent + assert master.comm.register_event.call_args_list[1][0][0] == TaskFinishedEvent + + +@pytest.mark.asyncio +async def test_start(): + master = Mastermind(TestTaskScheduler(), TaskRunner()) + master._mastermind_loop.start = Mock() + await master.start() + + master._mastermind_loop.start.assert_called_once() + + +@pytest.mark.asyncio +async def test_stop(): + master = Mastermind(TestTaskScheduler(), TaskRunner()) + master._mastermind_loop.stop = Mock() + await master.stop() + + master._mastermind_loop.stop.assert_called_once() + + +@pytest.mark.asyncio +async def test_is_running(): + master = Mastermind(TestTaskScheduler(), TaskRunner()) + master._mastermind_loop.is_running = Mock(return_value=True) + assert await master.is_running() is True + + +@pytest.mark.asyncio +async def test_loop_not_task(mocker): + mocker.patch("asyncio.sleep") + scheduler = TestTaskScheduler() + scheduler.get_task = AsyncMock(return_value=None) + master = Mastermind(scheduler, TaskRunner()) + + await master._loop() + asyncio.sleep.assert_called_once_with(10) + + +@pytest.mark.asyncio +async def test_loop_not_runnable_task(mocker): + mocker.patch("asyncio.sleep") + scheduler = TestTaskScheduler() + scheduler.get_task = AsyncMock(return_value=TestTask()) + + runner = TaskRunner() + runner.can_run = AsyncMock(return_value=False) + + master = Mastermind(scheduler, runner) + + await master._loop() + asyncio.sleep.assert_called_once_with(10) + + +@pytest.mark.asyncio +async def test_loop_late_start(mocker, caplog): + mocker.patch("asyncio.sleep") + task = TestTask(Time.now() - timedelta(seconds=400), False) + + scheduler = TestTaskScheduler() + scheduler.get_task = AsyncMock(return_value=task) + + runner = TaskRunner() + runner.can_run = AsyncMock(return_value=True) + + master = Mastermind(scheduler, runner) + + with caplog.at_level(logging.WARNING): + await master._loop() + await master._loop() + + assert caplog.messages[0] == "Time since start of window (400.0) too long (>300.0), skipping task..." + + assert len(caplog.messages) == 1 # Test that only the first task throws error msg + + asyncio.sleep.assert_called_with(10) + + +@pytest.mark.asyncio +async def test_loop_valid(mocker): + mocker.patch("asyncio.sleep") + task = TestTask(can_start_late=False) + + scheduler = TestTaskScheduler() + scheduler.get_task = AsyncMock(return_value=task) + + runner = TaskRunner() + runner.can_run = AsyncMock(return_value=True) + runner.run_task = AsyncMock() + + master = Mastermind(scheduler, runner) + + await master._loop() + + asyncio.sleep.assert_not_called() + runner.run_task.assert_awaited_with(task, task_schedule=scheduler) + + assert master._task is None + + +@pytest.mark.asyncio +async def test_loop_failed_task(mocker, caplog): + mocker.patch("asyncio.sleep") + task = TestTask(can_start_late=False) + + scheduler = TestTaskScheduler() + scheduler.get_task = AsyncMock(return_value=task) + + runner = TaskRunner() + runner.can_run = AsyncMock(return_value=True) + runner.run_task = AsyncMock(side_effect=Exception("")) + + master = Mastermind(scheduler, runner) + + with caplog.at_level(logging.WARNING): + await master._loop() + + asyncio.sleep.assert_not_called() + + assert caplog.messages[0] == "Task Task failed." + assert master._task is None + + +@pytest.mark.asyncio +async def test_get_fits_header_before_without_task(): + master = Mastermind(TestTaskScheduler(), TaskRunner()) + + header = await master.get_fits_header_before() + + assert header == {} + + +@pytest.mark.asyncio +async def test_get_fits_header_before_with_task(): + master = Mastermind(TestTaskScheduler(), TaskRunner()) + + master._task = TestTask() + + header = await master.get_fits_header_before() + + assert header["TASK"] == ("Task", "Name of task") + assert header["REQNUM"] == ("0", "Unique ID of task") + assert header["TASK-HDR"] == (0, "") diff --git a/tests/modules/robotic/test_pointingseries.py b/tests/modules/robotic/test_pointingseries.py new file mode 100644 index 000000000..6b49682de --- /dev/null +++ b/tests/modules/robotic/test_pointingseries.py @@ -0,0 +1,87 @@ +from __future__ import annotations +import datetime +from typing import Any, Union, Optional, Dict +from unittest.mock import AsyncMock + +import pandas as pd +import pytest +import pytest_mock +from astroplan import Observer + +import pyobs +from pyobs.modules.robotic import PointingSeries +from tests.modules.robotic.conftest import MockAcquisition, MockTelescope + + +class MockProxy: + def __init__(self, **kwargs: Any) -> None: + self.acquisition = MockAcquisition() + self.telescope = MockTelescope() + + async def __call__(self, name: str, *args: Any, **kwargs: Any) -> Union[MockTelescope, MockAcquisition]: + if name == "acquisition": + return self.acquisition + if name == "telescope": + return self.telescope + + raise ValueError + + +class MockPointingSeriesIterator: + def __init__(self) -> None: + self.grid_points = pd.DataFrame() + self.counter = None + + def set_grid_points(self, grid_points: pd.DataFrame) -> None: + self.grid_points = grid_points + + def __aiter__(self) -> MockPointingSeriesIterator: + self.counter = len(self.grid_points) + return self + + async def __anext__(self) -> Optional[Dict[str, Any]]: + self.counter -= 1 + if self.counter >= 0: + return {"datetime": "", "ra": 0.0, "dec": 0.0, "az": 0.0, "alt": 0.0} + raise StopAsyncIteration + + +@pytest.mark.asyncio +async def test_open(observer: Observer, mocker: pytest_mock.MockerFixture) -> None: + mocker.patch("pyobs.modules.Module.open") + mock_proxy = MockProxy() + mock_proxy.telescope.move_radec = AsyncMock() # type: ignore + + series = PointingSeries(num_alt=1, num_az=1, observer=observer, finish=0) + series.comm.proxy = mock_proxy # type: ignore + + await series.open() + + assert series._pointing_series_iterator._acquisition == mock_proxy.acquisition # type: ignore + assert series._pointing_series_iterator._telescope == mock_proxy.telescope # type: ignore + + +@pytest.mark.asyncio +async def test_run_thread(observer: Observer, mocker: pytest_mock.MockFixture) -> None: + current_time = pyobs.utils.time.Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + + mocker.patch("pyobs.utils.time.Time.now", return_value=current_time) + series = PointingSeries(num_alt=1, num_az=1, observer=observer, finish=0) + series._pointing_series_iterator = MockPointingSeriesIterator() # type: ignore + series._process_acquisition = AsyncMock() # type: ignore + + await series._run_thread() + + series._process_acquisition.assert_called_once_with(**{"datetime": "", "ra": 0.0, "dec": 0.0, "az": 0.0, "alt": 0.0}) + + +@pytest.mark.asyncio +async def test_no_observer() -> None: + with pytest.raises(ValueError): + PointingSeries(num_alt=1, num_az=1, finish=0) + + +@pytest.mark.asyncio +async def test_is_running(observer: Observer) -> None: + series = PointingSeries(num_alt=1, num_az=1, observer=observer, finish=0) + assert await series.is_running() is True diff --git a/tests/modules/robotic/test_pointingseriesiterator.py b/tests/modules/robotic/test_pointingseriesiterator.py new file mode 100644 index 000000000..c91906417 --- /dev/null +++ b/tests/modules/robotic/test_pointingseriesiterator.py @@ -0,0 +1,93 @@ +import datetime +from unittest.mock import AsyncMock + +import astroplan +import pandas as pd +import pytest +import pytest_mock +from astropy.coordinates import SkyCoord + +import pyobs +from pyobs.modules.robotic._pointingseriesiterator import _PointingSeriesIterator +from tests.modules.robotic.conftest import MockTelescope, MockAcquisition + + +@pytest.mark.asyncio +async def test_telescope_not_set(observer: astroplan.Observer) -> None: + iterator = _PointingSeriesIterator(observer, (-80.0, 80.0), 0, 0) + iterator.set_acquisition(MockAcquisition()) # type: ignore + + with pytest.raises(ValueError): + async for _ in iterator: + ... + + +@pytest.mark.asyncio +async def test_acquisition_not_set(observer: astroplan.Observer) -> None: + iterator = _PointingSeriesIterator(observer, (-80.0, 80.0), 0, 0) + iterator.set_telescope(MockTelescope()) # type: ignore + + with pytest.raises(ValueError): + async for _ in iterator: + ... + + +@pytest.mark.asyncio +async def test_empty(observer: astroplan.Observer) -> None: + coords = pd.DataFrame({"alt": [], "az": [], "done": []}) + coords.set_index(["alt", "az"]) + + iterator = _PointingSeriesIterator(observer, (-80.0, 80.0), 0, 0) + + iterator.set_telescope(MockTelescope()) # type: ignore + iterator.set_acquisition(MockAcquisition()) # type: ignore + iterator.set_grid_points(coords) + + with pytest.raises(StopAsyncIteration): + await anext(iterator) # type: ignore + + +@pytest.mark.asyncio +async def test_next(observer: astroplan.Observer, mocker: pytest_mock.MockerFixture) -> None: + current_time = pyobs.utils.time.Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + + mocker.patch("astropy.coordinates.SkyCoord.transform_to", return_value=SkyCoord(0, 0, unit='deg', frame='icrs')) + mocker.patch("astroplan.Observer.moon_altaz", return_value=SkyCoord(-90, 0, unit='deg', frame='altaz', obstime=current_time, location=observer.location)) + + coords = pd.DataFrame({"alt": [60], "az": [0], "done": [False]}) + coords = coords.set_index(["alt", "az"]) + + mocker.patch("pyobs.utils.time.Time.now", return_value=current_time) + + iterator = _PointingSeriesIterator(observer,(-80.0, 80.0), 0, 0) + iterator.set_telescope(MockTelescope()) # type: ignore + iterator.set_acquisition(MockAcquisition()) # type: ignore + iterator.set_grid_points(coords) + + iterator._telescope.move_radec = AsyncMock() # type: ignore + + assert await anext(iterator) == {"datetime": "", "ra": 0.0, "dec": 0.0, "az": 0.0, "alt": 0.0} # type: ignore + + iterator._telescope.move_radec.assert_called_once_with(0, 0) # type: ignore + + +@pytest.mark.asyncio +async def test_acquire_error(observer: astroplan.Observer, mocker: pytest_mock.MockerFixture) -> None: + current_time = pyobs.utils.time.Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + + mocker.patch("astropy.coordinates.SkyCoord.transform_to", return_value=SkyCoord(0, 0, unit='deg', frame='icrs')) + mocker.patch("astroplan.Observer.moon_altaz", return_value=SkyCoord(-90, 0, unit='deg', frame='altaz', obstime=current_time, location=observer.location)) + + coords = pd.DataFrame({"alt": [60], "az": [0], "done": [False]}) + coords = coords.set_index(["alt", "az"]) + + mocker.patch("pyobs.utils.time.Time.now", return_value=current_time) + + iterator = _PointingSeriesIterator(observer, (-80.0, 80.0), 0, 0) + iterator.set_telescope(MockTelescope()) # type: ignore + iterator.set_acquisition(MockAcquisition()) # type: ignore + iterator.set_grid_points(coords) + + iterator._acquisition.acquire_target = AsyncMock(side_effect=ValueError) # type: ignore + + assert await anext(iterator) is None # type: ignore diff --git a/tests/modules/robotic/test_scheduler.py b/tests/modules/robotic/test_scheduler.py index edc6e24bf..6d7bcd33e 100644 --- a/tests/modules/robotic/test_scheduler.py +++ b/tests/modules/robotic/test_scheduler.py @@ -1,67 +1,134 @@ -import time +import datetime +from typing import List, Optional, Dict +from unittest.mock import Mock, AsyncMock -from astroplan import ObservingBlock, FixedTarget -import astropy.units as u -from astropy.coordinates import SkyCoord +import pytest +import pytest_mock +from astroplan import ObservingBlock +import pyobs +from pyobs.events import GoodWeatherEvent, TaskStartedEvent, TaskFinishedEvent, Event from pyobs.modules.robotic import Scheduler +from pyobs.robotic import TaskArchive, TaskSchedule, Task +from pyobs.utils.time import Time -def test_compare_block_lists(): - # create lists of blocks - blocks = [] - for i in range(10): - blocks.append( - ObservingBlock( - FixedTarget(SkyCoord(0.0 * u.deg, 0.0 * u.deg, frame="icrs"), name=str(i)), 10 * u.minute, 10 - ) - ) +class TestTaskArchive(TaskArchive): - # create two lists from these with some overlap - blocks1 = blocks[:7] - blocks2 = blocks[5:] + async def last_changed(self) -> Optional[Time]: + pass - # compare - unique1, unique2 = Scheduler._compare_block_lists(blocks1, blocks2) + async def get_schedulable_blocks(self) -> List[ObservingBlock]: + pass - # get names - names1 = [int(b.target.name) for b in unique1] - names2 = [int(b.target.name) for b in unique2] - # names1 should contain 0, 1, 2, 3, 4 - assert set(names1) == {0, 1, 2, 3, 4} +class TestTaskSchedule(TaskSchedule): - # names2 should contain 7, 8, 9 - assert set(names2) == {7, 8, 9} + async def set_schedule(self, blocks: List[ObservingBlock], start_time: Time) -> None: + pass - # create two lists from these with no overlap - blocks1 = blocks[:5] - blocks2 = blocks[5:] + async def last_scheduled(self) -> Optional[Time]: + pass - # compare - unique1, unique2 = Scheduler._compare_block_lists(blocks1, blocks2) + async def get_schedule(self) -> Dict[str, Task]: + pass - # get names - names1 = [int(b.target.name) for b in unique1] - names2 = [int(b.target.name) for b in unique2] + async def get_task(self, time: Time) -> Optional[Task]: + pass - # names1 should contain 0, 1, 2, 3, 4 - assert set(names1) == {0, 1, 2, 3, 4} - # names2 should contain 5, 6, 7, 8, 9 - assert set(names2) == {5, 6, 7, 8, 9} +@pytest.mark.asyncio +async def test_update_worker_loop_no_update() -> None: + scheduler = Scheduler(TestTaskArchive(), TestTaskSchedule(), trigger_on_task_started=True) + scheduler._task_updater.update = AsyncMock(return_value=None) # type: ignore + scheduler._scheduler_task.start = Mock() # type: ignore - # create two identical lists - blocks1 = blocks - blocks2 = blocks + await scheduler._update_worker_loop() - # compare - unique1, unique2 = Scheduler._compare_block_lists(blocks1, blocks2) + scheduler._scheduler_task.start.assert_not_called() - # get names - names1 = [int(b.target.name) for b in unique1] - names2 = [int(b.target.name) for b in unique2] - # both lists should be empty - assert len(names1) == 0 - assert len(names2) == 0 +@pytest.mark.asyncio +async def test_update_worker_loop_with_update() -> None: + blocks: List[ObservingBlock] = [] + scheduler = Scheduler(TestTaskArchive(), TestTaskSchedule(), trigger_on_task_started=True) + scheduler._task_updater.update = AsyncMock(return_value=blocks) # type: ignore + scheduler._scheduler_task.start = Mock() # type: ignore + + await scheduler._update_worker_loop() + + assert scheduler._task_scheduler._blocks == blocks + scheduler._scheduler_task.start.assert_called_once() + + +@pytest.mark.asyncio +async def test_on_task_started() -> None: + scheduler = Scheduler(TestTaskArchive(), TestTaskSchedule(), trigger_on_task_started=True) + scheduler._scheduler_task.start = Mock() # type: ignore + + time = pyobs.utils.time.Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + event = TaskStartedEvent(id=0, eta=time, name="") + + await scheduler._on_task_started(event, "") + + assert scheduler._task_scheduler._current_task_id == 0 # type: ignore + assert scheduler._task_updater._current_task_id == 0 # type: ignore + assert scheduler._task_updater._last_task_id == 0 # type: ignore + + scheduler._scheduler_task.start.assert_called_once() + assert scheduler._schedule_start == time + + +@pytest.mark.asyncio +async def test_on_task_started_wrong_event() -> None: + scheduler = Scheduler(TestTaskArchive(), TestTaskSchedule(), trigger_on_task_started=True) + event = Event() + + assert await scheduler._on_task_started(event, "") is False + + +@pytest.mark.asyncio +async def test_on_task_finished(mocker: pytest_mock.MockFixture) -> None: + current_time = pyobs.utils.time.Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + mocker.patch("pyobs.utils.time.Time.now", return_value=current_time) + + scheduler = Scheduler(TestTaskArchive(), TestTaskSchedule(), trigger_on_task_finished=True) + scheduler._scheduler_task.start = Mock() # type: ignore + event = TaskFinishedEvent(id=0, name="") + + await scheduler._on_task_finished(event, "") + + assert scheduler._task_updater._current_task_id is None + scheduler._scheduler_task.start.assert_called_once() + + assert scheduler._schedule_start == current_time + + +@pytest.mark.asyncio +async def test_on_task_finished_wrong_event() -> None: + scheduler = Scheduler(TestTaskArchive(), TestTaskSchedule(), trigger_on_task_started=True) + event = Event() + + assert await scheduler._on_task_finished(event, "") is False + + +@pytest.mark.asyncio +async def test_on_good_weather() -> None: + scheduler = Scheduler(TestTaskArchive(), TestTaskSchedule(), trigger_on_task_started=True) + scheduler._scheduler_task.start = Mock() # type: ignore + + time = pyobs.utils.time.Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + event = GoodWeatherEvent(id=0, eta=time, name="") + + await scheduler._on_good_weather(event, "") + + scheduler._scheduler_task.start.assert_called_once() + assert scheduler._schedule_start == time + + +@pytest.mark.asyncio +async def test_on_good_weather_not_weather_event() -> None: + scheduler = Scheduler(TestTaskArchive(), TestTaskSchedule(), trigger_on_task_started=True) + event = Event() + + assert await scheduler._on_good_weather(event, "") is False diff --git a/tests/modules/robotic/test_taskscheduler.py b/tests/modules/robotic/test_taskscheduler.py new file mode 100644 index 000000000..1407e6ab5 --- /dev/null +++ b/tests/modules/robotic/test_taskscheduler.py @@ -0,0 +1,195 @@ +import datetime +import multiprocessing +from typing import List, Any +from unittest.mock import AsyncMock, Mock + +import astroplan +import astropy.units as u +import pytest +import pytest_mock +from astroplan import ObservingBlock, FixedTarget, Observer +from astropy.coordinates import SkyCoord + +from pyobs.modules.robotic._taskscheduler import _TaskScheduler +from pyobs.utils.time import Time +from tests.modules.robotic.test_mastermind import TestTask +from tests.modules.robotic.test_scheduler import TestTaskSchedule + + +@pytest.mark.asyncio +async def test_prepare_schedule_invalid_twilight(observer: Observer) -> None: + with pytest.raises(ValueError): + _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "invalid") + + +@pytest.mark.asyncio +async def test_prepare_schedule_astronomical_twilight(observer: Observer, schedule_blocks: List[ObservingBlock]) -> None: + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "astronomical") + + assert scheduler._scheduler.constraints[0].max_solar_altitude == -18 * u.deg + + +@pytest.mark.asyncio +async def test_prepare_schedule_nautical_twilight(observer: Observer, schedule_blocks: List[ObservingBlock]) -> None: + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + assert scheduler._scheduler.constraints[0].max_solar_altitude == -12 * u.deg + + +@pytest.mark.asyncio +async def test_prepare_schedule_no_blocks(observer: Observer) -> None: + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + + with pytest.raises(ValueError): + await scheduler._prepare_schedule() + + +@pytest.mark.asyncio +async def test_prepare_schedule_no_start(observer: Observer, schedule_blocks: List[ObservingBlock], mocker: pytest_mock.MockFixture) -> None: + current_time = Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + mocker.patch("pyobs.utils.time.Time.now", return_value=current_time) + + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + scheduler._blocks = schedule_blocks + + _, start, _ = await scheduler._prepare_schedule() + + assert start.to_datetime() == datetime.datetime(2024, 4, 1, 20, 1, 0) + + +@pytest.mark.asyncio +async def test_prepare_schedule_start(observer: Observer, schedule_blocks: List[ObservingBlock], mocker: pytest_mock.MockFixture) -> None: + current_time = Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + mocker.patch("pyobs.utils.time.Time.now", return_value=current_time) + + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + scheduler._blocks = schedule_blocks + scheduler._schedule_start = Time(datetime.datetime(2024, 4, 1, 20, 1, 0)) + + _, start, _ = await scheduler._prepare_schedule() + + assert start.to_datetime() == datetime.datetime(2024, 4, 1, 20, 1, 0) + + +@pytest.mark.asyncio +async def test_prepare_schedule_end(observer: Observer, schedule_blocks: List[ObservingBlock], mocker: pytest_mock.MockFixture) -> None: + current_time = Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + mocker.patch("pyobs.utils.time.Time.now", return_value=current_time) + + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + scheduler._blocks = schedule_blocks + scheduler._schedule_start = Time(datetime.datetime(2024, 4, 1, 20, 1, 0)) + + _, _, end = await scheduler._prepare_schedule() + + assert end.to_datetime() == datetime.datetime(2024, 4, 2, 20, 1, 0) + + +@pytest.mark.asyncio +async def test_prepare_schedule_block_filtering(observer: Observer, schedule_blocks: List[ObservingBlock], mocker: pytest_mock.MockFixture) -> None: + current_time = Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + mocker.patch("pyobs.utils.time.Time.now", return_value=current_time) + + over_time = Time(datetime.datetime(2024, 4, 3, 20, 0, 0)) + in_time = Time(datetime.datetime(2024, 4, 2, 10, 0, 0)) + + schedule_blocks[1].constraints.append(astroplan.TimeConstraint(min=over_time, max=over_time)) + schedule_blocks[2].constraints.append(astroplan.TimeConstraint(min=in_time, max=over_time)) + + blocks = [ + schedule_blocks[0], schedule_blocks[1], schedule_blocks[2], schedule_blocks[3] + ] + + task_scheduler = TestTaskSchedule() + task_scheduler.get_schedule = AsyncMock(return_value={"0": TestTask()}) # type: ignore + + scheduler = _TaskScheduler(task_scheduler, observer, 24, 60, "nautical") + scheduler._schedule_start = Time(datetime.datetime(2024, 4, 1, 20, 1, 0)) + scheduler._current_task_id = "0" + scheduler._blocks = blocks + + res_blocks, _, _ = await scheduler._prepare_schedule() + + assert [block.configuration["request"]["id"] for block in res_blocks] == ["2", "3"] + + +def mock_schedule_process(blocks: List[ObservingBlock], start: Time, end: Time, + scheduled_blocks: multiprocessing.Queue) -> None: # type: ignore + scheduled_blocks.put(blocks) + + +@pytest.mark.asyncio +async def test_schedule_blocks(observer: Observer) -> None: + + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + scheduler._schedule_process = mock_schedule_process # type: ignore + + time = Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + block = ObservingBlock( + FixedTarget(SkyCoord(0.0 * u.deg, 0.0 * u.deg, frame="icrs"), name=0), 10 * u.minute, 2000.0, + configuration={"request": {"id": "0"}} + ) + blocks = [block] + scheduled_blocks = await scheduler._schedule_blocks(blocks, time, time) + assert scheduled_blocks[0].configuration["request"]["id"] == block.configuration["request"]["id"] + + +@pytest.mark.asyncio +async def test_finish_schedule(observer: Observer) -> None: + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + scheduler._schedule.set_schedule = AsyncMock() # type: ignore + + time = Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + block = ObservingBlock( + FixedTarget(SkyCoord(0.0 * u.deg, 0.0 * u.deg, frame="icrs"), name=0), 10 * u.minute, 2000.0, + configuration={"request": {"id": "0"}}, + constraints=[astroplan.TimeConstraint(min=time, max=time)] + ) + block.start_time = time + block.end_time = time + blocks = [block] + + await scheduler._finish_schedule(blocks, time) + scheduler._schedule.set_schedule.assert_called_with(blocks, time) + + +@pytest.mark.asyncio +async def test_convert_blocks_to_astroplan(observer: Observer) -> None: + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + time = Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + block = ObservingBlock( + FixedTarget(SkyCoord(0.0 * u.deg, 0.0 * u.deg, frame="icrs"), name=0), 10 * u.minute, 2000.0, + constraints=[astroplan.TimeConstraint(min=time, max=time)] + ) + scheduler._blocks = [block] + + converted_blocks = await scheduler._convert_blocks_to_astroplan() + + assert converted_blocks[0].priority == 0 + assert converted_blocks[0].constraints[0].min.to_datetime() == datetime.datetime(2024, 4, 1, 20, 0, 30) + assert converted_blocks[0].constraints[0].max.to_datetime() == datetime.datetime(2024, 4, 1, 19, 59, 30) + + +class MockSchedule: + def __init__(self, blocks: List[ObservingBlock]) -> None: + self.scheduled_blocks = blocks + + +@pytest.mark.asyncio +async def test_schedule_process(observer: Observer) -> None: + scheduler = _TaskScheduler(TestTaskSchedule(), observer, 24, 60, "nautical") + + time = Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + block = ObservingBlock( + FixedTarget(SkyCoord(0.0 * u.deg, 0.0 * u.deg, frame="icrs"), name=0), 10 * u.minute, 2000.0, + constraints=[astroplan.TimeConstraint(min=time, max=time)] + ) + blocks = [block] + scheduler._scheduler = Mock(return_value=MockSchedule(blocks)) + + queue = multiprocessing.Queue() # type: ignore + scheduler._schedule_process(blocks, time, time, queue) + converted_blocks = queue.get() + + assert converted_blocks[0].priority == 2000.0 + assert converted_blocks[0].constraints[0].min.to_datetime() == datetime.datetime(2024, 4, 1, 20, 0) + assert converted_blocks[0].constraints[0].max.to_datetime() == datetime.datetime(2024, 4, 1, 20, 0) diff --git a/tests/modules/robotic/test_taskupdater.py b/tests/modules/robotic/test_taskupdater.py new file mode 100644 index 000000000..0cadbf85e --- /dev/null +++ b/tests/modules/robotic/test_taskupdater.py @@ -0,0 +1,128 @@ +import datetime +from typing import List +from unittest.mock import Mock, AsyncMock + +import pytest +import pytest_mock +from astroplan import ObservingBlock + +import pyobs +from pyobs.modules.robotic._taskupdater import _TaskUpdater +from pyobs.utils.time import Time +from tests.modules.robotic.test_scheduler import TestTaskArchive, TestTaskSchedule + + +def test_compare_block_lists_with_overlap(schedule_blocks: List[ObservingBlock]) -> None: + old_blocks = schedule_blocks[:7] + new_blocks = schedule_blocks[5:] + + removed, added = _TaskUpdater._compare_block_lists(old_blocks, new_blocks) + + removed_names = [int(b.target.name) for b in removed] + new_names = [int(b.target.name) for b in added] + + assert set(removed_names) == {0, 1, 2, 3, 4} + assert set(new_names) == {7, 8, 9} + + +def test_compare_block_lists_without_overlap(schedule_blocks: List[ObservingBlock]) -> None: + old_blocks = schedule_blocks[:5] + new_blocks = schedule_blocks[5:] + + removed, added = _TaskUpdater._compare_block_lists(old_blocks, new_blocks) + + removed_names = [int(b.target.name) for b in removed] + new_names = [int(b.target.name) for b in added] + + assert set(removed_names) == {0, 1, 2, 3, 4} + assert set(new_names) == {5, 6, 7, 8, 9} + + +def test_compare_block_lists_identical(schedule_blocks: List[ObservingBlock]) -> None: + old_blocks = schedule_blocks + new_blocks = schedule_blocks + + removed, added = _TaskUpdater._compare_block_lists(old_blocks, new_blocks) + + removed_names = [int(b.target.name) for b in removed] + new_names = [int(b.target.name) for b in added] + + assert len(removed_names) == 0 + assert len(new_names) == 0 + + +@pytest.mark.asyncio +async def test_worker_loop_not_changed() -> None: + scheduler = _TaskUpdater(TestTaskArchive(), TestTaskSchedule()) + scheduler._update_blocks = AsyncMock() # type: ignore + + time = pyobs.utils.time.Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + scheduler._task_archive.last_changed = AsyncMock(return_value=time) # type: ignore + scheduler._last_change = time + + await scheduler.update() + + scheduler._update_blocks.assert_not_called() + + +@pytest.mark.asyncio +async def test_worker_loop_new_changes(mocker: pytest_mock.MockFixture) -> None: + time = pyobs.utils.time.Time(datetime.datetime(2024, 4, 1, 20, 0, 0)) + mocker.patch("pyobs.utils.time.Time.now", return_value=time) + scheduler = _TaskUpdater(TestTaskArchive(), TestTaskSchedule()) + scheduler._update_blocks = AsyncMock() # type: ignore + + scheduler._task_archive.last_changed = AsyncMock(return_value=time) # type: ignore + scheduler._last_change = time - datetime.timedelta(minutes=1) # type: ignore + + await scheduler.update() + + scheduler._update_blocks.assert_called_once() + assert scheduler._last_change == time + + +@pytest.mark.asyncio +async def test_update_blocks_no_changes(schedule_blocks: List[ObservingBlock]) -> None: + scheduler = _TaskUpdater(TestTaskArchive(), TestTaskSchedule()) + scheduler._task_archive.get_schedulable_blocks = AsyncMock(return_value=schedule_blocks) # type: ignore + scheduler._blocks = schedule_blocks + + assert await scheduler._update_blocks() is None + + +@pytest.mark.asyncio +async def test_update_blocks_removed_current(schedule_blocks: List[ObservingBlock]) -> None: + scheduler = _TaskUpdater(TestTaskArchive(), TestTaskSchedule()) + + scheduler._task_archive.get_schedulable_blocks = AsyncMock(return_value=schedule_blocks) # type: ignore + scheduler._blocks = schedule_blocks + scheduler._last_task_id = "0" + + scheduler._compare_block_lists = Mock(return_value=([schedule_blocks[0]], [])) # type: ignore + + assert await scheduler._update_blocks() is None + + +@pytest.mark.asyncio +async def test_update_blocks_removed_not_in_schedule(schedule_blocks: List[ObservingBlock]) -> None: + scheduler = _TaskUpdater(TestTaskArchive(), TestTaskSchedule()) + scheduler._task_archive.get_schedulable_blocks = AsyncMock(return_value=schedule_blocks) # type: ignore + scheduler._schedule.get_schedule = AsyncMock(return_value=[]) # type: ignore + scheduler._blocks = schedule_blocks + + scheduler._compare_block_lists = Mock(return_value=([schedule_blocks[0]], [])) # type: ignore + + assert await scheduler._update_blocks() is None + + +@pytest.mark.asyncio +async def test_update_blocks_need_to_update(schedule_blocks: List[ObservingBlock]) -> None: + scheduler = _TaskUpdater(TestTaskArchive(), TestTaskSchedule()) + + scheduler._task_archive.get_schedulable_blocks = AsyncMock(return_value=schedule_blocks) # type: ignore + scheduler._schedule.get_schedule = AsyncMock(return_value=[]) # type: ignore + scheduler._blocks = [] + + scheduler._compare_block_lists = Mock(return_value=([], [schedule_blocks[0]])) # type: ignore + + assert await scheduler._update_blocks() == schedule_blocks \ No newline at end of file diff --git a/tests/test_background_task.py b/tests/test_background_task.py index f218a762f..bdf67bfa9 100644 --- a/tests/test_background_task.py +++ b/tests/test_background_task.py @@ -74,3 +74,7 @@ async def test_callback_restart(caplog): assert caplog.messages[0] == "Background task for test_function has died, restarting..." bg_task.start.assert_called_once() + +def test_is_running_not_started(): + bg_task = BackgroundTask(AsyncMock(), True) + assert bg_task.is_running() is False