Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f5cf28f
Added unit tests to mastermind
GermanHydrogen Mar 20, 2024
45a771f
Cleaned up main loopmastermind
GermanHydrogen Mar 20, 2024
ccee05d
Removed depricated code from mastermind
GermanHydrogen Mar 20, 2024
1d7ec08
Merge branch 'develop' into robotic-test
GermanHydrogen Mar 27, 2024
f86557d
Added is running method to background task
GermanHydrogen Mar 27, 2024
29a2973
Updated background task in mastermind
GermanHydrogen Mar 27, 2024
f0a3f5f
Refactoreesisting g schedular unit test
GermanHydrogen Mar 27, 2024
09958cf
Added unit tests to worker lof the scheduler
GermanHydrogen Mar 27, 2024
768c903
Added basic tests to prepare schuedule
GermanHydrogen Apr 9, 2024
94ad64b
Added unit tests to the event handlers of scheduler
GermanHydrogen Apr 9, 2024
a8e6549
Refactored prepare sechdule of schedule
GermanHydrogen Apr 10, 2024
f7d93d9
Fixed typing
GermanHydrogen Apr 27, 2024
89f25f9
Added missing test cases
GermanHydrogen Apr 29, 2024
1b87cec
Added type annotations to tescases
GermanHydrogen Apr 29, 2024
7139acb
Utilized the new background task api
GermanHydrogen Apr 29, 2024
5345700
Seperated update worker loop
GermanHydrogen Apr 30, 2024
9b3b555
Fixed worker loop tests
GermanHydrogen Apr 30, 2024
ad20208
Added broad test for pointing series
GermanHydrogen Apr 30, 2024
f911566
Seperated pointing series running into a seperate class
GermanHydrogen May 5, 2024
e04f18c
Added looped random iterator
GermanHydrogen May 5, 2024
79158dd
Added tests to pointing series iterator
GermanHydrogen May 6, 2024
9e733f2
Changed api of pointing series iterator to facilitate better testing …
GermanHydrogen May 6, 2024
7da04b0
Added missing tests to pointing series
GermanHydrogen May 6, 2024
14c24b2
Extracted task scheduler methods
GermanHydrogen May 6, 2024
6fb41a9
Seperated task update from scheduler
GermanHydrogen May 6, 2024
578cf05
Added missing unit test to task scheduler
GermanHydrogen May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pyobs/background_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,11 @@ def _callback_function(self, args=None) -> None:
log.error("Background task for %s has died, quitting...", self._func.__name__)

def stop(self) -> None:
if self._task is not None:
if self.is_running():
self._task.cancel()

def is_running(self) -> bool:
if self._task is None:
return False

return self._task.done()
126 changes: 126 additions & 0 deletions pyobs/modules/robotic/_pointingseriesiterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from __future__ import annotations

import logging
import random
from collections.abc import Iterator
from copy import copy
from typing import Tuple, Any, Optional, List, Dict, cast

import astropy
import astropy.units as u
import pandas as pd
from astroplan import Observer
from astropy.coordinates import SkyCoord

from pyobs.interfaces import IAcquisition, ITelescope
from pyobs.utils import exceptions as exc
from pyobs.utils.time import Time

log = logging.getLogger(__name__)


class _LoopedRandomIterator(Iterator[Any]):
def __init__(self, data: List[Any]) -> None:
self._data = copy(data)
self._todo = copy(data)

def __iter__(self) -> _LoopedRandomIterator:
return self

def __next__(self) -> Any:
if len(self._todo) == 0:
self._todo = copy(self._data)

item = random.sample(self._todo, 1)[0]
self._todo.remove(item)

return item


class _PointingSeriesIterator:
def __init__(self,
observer: Observer,
dec_range: Tuple[float, float],
finish_percentage: float,
min_moon_dist: float) -> None:

self._observer = observer
self._telescope: Optional[ITelescope] = None
self._acquisition: Optional[IAcquisition] = None

self._dec_range = dec_range
self._finish_fraction = 1.0 - finish_percentage / 100.0
self._min_moon_dist = min_moon_dist

self._grid_points: pd.DataFrame = pd.DataFrame({"alt": [], "az": [], "done": []})

def set_grid_points(self, grid_points: pd.DataFrame) -> None:
self._grid_points = grid_points

def set_telescope(self, telescope: ITelescope) -> None:
self._telescope = telescope

def set_acquisition(self, acquisition: IAcquisition) -> None:
self._acquisition = acquisition

def __aiter__(self) -> _PointingSeriesIterator:
if self._acquisition is None:
raise ValueError("Acquisition is not set.")

if self._telescope is None:
raise ValueError("Telescope is not set.")

return self

async def __anext__(self) -> Optional[Dict[str, Any]]:
if self._is_finished():
raise StopAsyncIteration()

todo_coords = self._get_todo_coords()
log.info("Grid points left to do: %d", len(todo_coords))

alt, az, radec = self._find_next_point(todo_coords)

try:
acquisition_result = await self._acquire_target(radec)
except (ValueError, exc.RemoteError):
log.info("Could not acquire target.")
return None

self._grid_points.loc[alt, az] = True
return acquisition_result

def _is_finished(self) -> bool:
num_finished_coords: int = sum(self._grid_points["done"].values)
total_num_coords: int = len(self._grid_points)

return num_finished_coords >= self._finish_fraction * total_num_coords

def _get_todo_coords(self) -> List[Tuple[float, float]]:
return list(self._grid_points[~self._grid_points["done"]].index)

def _find_next_point(self, todo_coords: List[Tuple[float, float]]) -> Tuple[float, float, astropy.coordinates.SkyCoord]: # type: ignore
moon = self._observer.moon_altaz(Time.now())

for alt, az in _LoopedRandomIterator(todo_coords):
altaz = SkyCoord(
alt=alt * u.deg, az=az * u.deg, frame="altaz", obstime=Time.now(), location=self._observer.location
)
radec = altaz.transform_to("icrs")

if self._is_valid_target(altaz, radec, moon):
log.info("Picked grid point at Alt=%.2f, Az=%.2f (%s).", alt, az, radec.to_string("hmsdms"))
return alt, az, radec

def _is_valid_target(self, altaz_coords: SkyCoord, radec_coords: SkyCoord, moon: SkyCoord) -> bool:
moon_separation_condition: bool = altaz_coords.separation(moon).degree >= self._min_moon_dist
dec_range_condition: bool = self._dec_range[0] <= radec_coords.dec.degree < self._dec_range[1]

return moon_separation_condition and dec_range_condition

async def _acquire_target(self, target: SkyCoord) -> dict[str, Any]:
telescope = cast(ITelescope, self._telescope) # Telescope has to be set in order to enter the iteration
await telescope.move_radec(float(target.ra.degree), float(target.dec.degree))

acquisition = cast(IAcquisition, self._acquisition)
return await acquisition.acquire_target() # Acquisition has to be set in order to enter the iteration
210 changes: 210 additions & 0 deletions pyobs/modules/robotic/_taskscheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import asyncio
import copy
import logging
import multiprocessing as mp
from typing import List, Tuple, Any, Optional, cast

import astroplan
import astropy
import astropy.units as u
from astroplan import ObservingBlock, Observer
from astropy.time import TimeDelta

from pyobs.robotic import TaskSchedule, Task
from pyobs.utils.time import Time

log = logging.getLogger(__name__)


class _TaskScheduler:
def __init__(self, schedule: TaskSchedule, observer: Observer, schedule_range: int, safety_time: int, twilight: str) -> None:
self._schedule = schedule
self._observer = observer

self._schedule_range = schedule_range
self._safety_time = safety_time

twilight_constraint = self._get_twilight_constraint(twilight)
self._scheduler = astroplan.PriorityScheduler(twilight_constraint, self._observer, transitioner=astroplan.Transitioner())

self._blocks: List[ObservingBlock] = []

self._current_task_id: Optional[str] = None
self._schedule_start: Optional[Time] = None

@staticmethod
def _get_twilight_constraint(twilight: str) -> List[astroplan.AtNightConstraint]:
if twilight == "astronomical":
return [astroplan.AtNightConstraint.twilight_astronomical()]
elif twilight == "nautical":
return [astroplan.AtNightConstraint.twilight_nautical()]
else:
raise ValueError("Unknown twilight type.")

def set_current_task_id(self, task_id: Optional[str]) -> None:
self._current_task_id = task_id

def set_schedule_start(self, time: Optional[Time]) -> None:
self._schedule_start = time

def set_blocks(self, blocks: List[ObservingBlock]) -> None:
self._blocks = blocks

async def schedule_task(self) -> None:
try:
# prepare scheduler
blocks, start, end = await self._prepare_schedule()

# schedule
scheduled_blocks = await self._schedule_blocks(blocks, start, end)

# finish schedule
await self._finish_schedule(scheduled_blocks, start)

except ValueError as e:
log.warning(str(e))

async def _prepare_schedule(self) -> Tuple[List[ObservingBlock], Time, Time]:
"""TaskSchedule blocks."""

converted_blocks = await self._convert_blocks_to_astroplan()

start, end = await self._get_time_range()

blocks = self._filter_blocks(converted_blocks, end)

# no blocks found?
if len(blocks) == 0:
await self._schedule.set_schedule([], start)
raise ValueError("No blocks left for scheduling.")

# return all
return blocks, start, end

async def _convert_blocks_to_astroplan(self) -> List[astroplan.ObservingBlock]:
copied_blocks = [copy.copy(block) for block in self._blocks]

for block in copied_blocks:
self._invert_block_priority(block)
self._tighten_block_time_constraints(block)

return copied_blocks

@staticmethod
def _invert_block_priority(block: astroplan.ObservingBlock) -> None:
"""
astroplan's PriorityScheduler expects lower priorities to be more important, so calculate
1000 - priority
"""
block.priority = max(1000.0 - block.priority, 0.0)

@staticmethod
def _tighten_block_time_constraints(block: astroplan.ObservingBlock) -> None:
"""
astroplan's PriorityScheduler doesn't match the requested observing windows exactly,
so we make them a little smaller.
"""
time_constraints = filter(lambda c: isinstance(c, astroplan.TimeConstraint), block.constraints)
for constraint in time_constraints:
constraint.min += 30 * u.second
constraint.max -= 30 * u.second

async def _get_time_range(self) -> Tuple[astropy.time.Time, astropy.time.Time]:
# get start time for scheduler
start = self._schedule_start
now_plus_safety = Time.now() + self._safety_time * u.second
if start is None or start < now_plus_safety:
# if no ETA exists or is in the past, use safety time
start = now_plus_safety

if (running_task := await self._get_current_task()) is not None:
log.info("Found running block that ends at %s.", running_task.end)

# get block end plus some safety
block_end = running_task.end + 10.0 * u.second
if start < block_end:
start = block_end
log.info("Start time would be within currently running block, shifting to %s.", cast(Time, start).isot)

# calculate end time
end = start + TimeDelta(self._schedule_range * u.hour)

return start, end

async def _get_current_task(self) -> Optional[Task]:
if self._current_task_id is None:
log.info("No running block found.")
return None

log.info("Trying to find running block in current schedule...")
tasks = await self._schedule.get_schedule()
if self._current_task_id in tasks:
return tasks[self._current_task_id]
else:
log.info("Running block not found in last schedule.")
return None

def _filter_blocks(self, blocks: List[astroplan.ObservingBlock], end: astropy.time.Time) -> List[astroplan.ObservingBlock]:
blocks_without_current = filter(lambda x: x.configuration["request"]["id"] != self._current_task_id, blocks)
blocks_in_schedule_range = filter(lambda b: self._is_block_starting_in_schedule(b, end), blocks_without_current)

return list(blocks_in_schedule_range)

@staticmethod
def _is_block_starting_in_schedule(block: astroplan.ObservingBlock, end: astropy.time.Time) -> bool:
time_constraints = [c for c in block.constraints if isinstance(c, astroplan.TimeConstraint)]

# does constraint start before the end of the scheduling range?
before_end = [c for c in time_constraints if c.min < end]

return len(time_constraints) == 0 or len(before_end) > 0

async def _schedule_blocks(self, blocks: List[ObservingBlock], start: Time, end: Time) -> List[ObservingBlock]:

# run actual scheduler in separate process and wait for it
qout: mp.Queue[List[ObservingBlock]] = mp.Queue()
p = mp.Process(target=self._schedule_process, args=(blocks, start, end, qout))
p.start()

# wait for process to finish
# note that the process only finishes, when the queue is empty! so we have to poll the queue first
# and then the process.
loop = asyncio.get_running_loop()
scheduled_blocks: List[ObservingBlock] = await loop.run_in_executor(None, qout.get, True)
await loop.run_in_executor(None, p.join)
return scheduled_blocks

async def _finish_schedule(self, scheduled_blocks: List[ObservingBlock], start: Time) -> None:
# update
await self._schedule.set_schedule(scheduled_blocks, start)
if len(scheduled_blocks) > 0:
log.info("Finished calculating schedule for %d block(s):", len(scheduled_blocks))
for i, block in enumerate(scheduled_blocks, 1):
log.info(
" #%d: %s to %s (%.1f)",
block.configuration["request"]["id"],
block.start_time.strftime("%H:%M:%S"),
block.end_time.strftime("%H:%M:%S"),
block.priority,
)
else:
log.info("Finished calculating schedule for 0 blocks.")

def _schedule_process(
self,
blocks: List[ObservingBlock],
start: Time,
end: Time,
scheduled_blocks: mp.Queue # type: ignore
) -> None:
"""Actually do the scheduling, usually run in a separate process."""

# log it
log.info("Calculating schedule for %d schedulable block(s) starting at %s...", len(blocks), start)

# run scheduler
time_range = astroplan.Schedule(start, end)
schedule = self._scheduler(blocks, time_range)

# put scheduled blocks in queue
scheduled_blocks.put(schedule.scheduled_blocks)
Loading