diff --git a/astroplan/constraints.py b/astroplan/constraints.py index 17b74f84..34604174 100644 --- a/astroplan/constraints.py +++ b/astroplan/constraints.py @@ -15,7 +15,7 @@ import numpy as np from astropy import table from astropy.time import Time -from astropy.coordinates import get_body, get_sun, Galactic, SkyCoord +from astropy.coordinates import get_body, get_sun, Galactic from numpy.lib.stride_tricks import as_strided # Package @@ -223,48 +223,45 @@ def __call__(self, observer, targets, times=None, time_range=None, time_grid_resolution=0.5*u.hour, grid_times_targets=False): """ - Compute the constraint for this class + Compute the constraint for this class. Parameters ---------- observer : `~astroplan.Observer` - the observation location from which to apply the constraints - targets : sequence of `~astroplan.Target` + The observation location from which to apply the constraints. + + targets : sequence of `~astroplan.Target` or `~astropy.coordinates.SkyCoord` The targets on which to apply the constraints. - times : `~astropy.time.Time` + times : `~astropy.time.Time` (optional) The times to compute the constraint. - WHAT HAPPENS WHEN BOTH TIMES AND TIME_RANGE ARE SET? - time_range : `~astropy.time.Time` (length = 2) + time_range : `~astropy.time.Time` (length = 2) (optional) Lower and upper bounds on time sequence. + Only used when ``times`` is not provided. time_grid_resolution : `~astropy.units.Quantity` - Time-grid spacing + Time-grid spacing. grid_times_targets : bool - if True, grids the constraint result with targets along the first + If True, grids the constraint result with targets along the first index and times along the second. Otherwise, we rely on broadcasting the shapes together using standard numpy rules. + Returns ------- constraint_result : 1D or 2D array of float or bool - The constraints. If 2D with targets along the first index and times along + The constraint values. If 2D with targets along the first index and times along the second. """ - if times is None and time_range is not None: times = time_grid_from_range(time_range, time_resolution=time_grid_resolution) - if grid_times_targets: - targets = get_skycoord(targets) - # TODO: these broadcasting operations are relatively slow - # but there is potential for huge speedup if the end user - # disables gridding and re-shapes the coords themselves - # prior to evaluating multiple constraints. - if targets.isscalar: - # ensure we have a (1, 1) shape coord - targets = SkyCoord(np.tile(targets, 1))[:, np.newaxis] - else: - targets = targets[..., np.newaxis] - times, targets = observer._preprocess_inputs(times, targets, grid_times_targets=False) + # TODO: broadcasting operations in _preprocess_inputs are relatively slow + # but there is potential for huge speedup if the end user + # disables gridding and re-shapes the coords themselves + # prior to evaluating multiple constraints. + times, targets = observer._preprocess_inputs( + times, targets, grid_times_targets=grid_times_targets + ) + result = self.compute_constraint(times, observer, targets) # make sure the output has the same shape as would result from diff --git a/astroplan/exceptions.py b/astroplan/exceptions.py index 5221b367..18642cce 100644 --- a/astroplan/exceptions.py +++ b/astroplan/exceptions.py @@ -40,3 +40,8 @@ class PlotBelowHorizonWarning(PlotWarning): class MissingConstraintWarning(AstroplanWarning): """Triggered when a constraint is expected but not supplied""" pass + + +class InvalidTLEDataWarning(AstroplanWarning): + """TLE data invalid for the requested time""" + pass diff --git a/astroplan/observer.py b/astroplan/observer.py index a86b4793..30ba6de5 100644 --- a/astroplan/observer.py +++ b/astroplan/observer.py @@ -507,22 +507,44 @@ def _preprocess_inputs(self, time, target=None, grid_times_targets=False): the shapes together using standard numpy rules. Useful for grid searches for rise/set times etc. """ - # make sure we have a non-scalar time if not isinstance(time, Time): time = Time(time) + # In grid mode, scalar time should still behave like a length-1 time axis + if grid_times_targets and time.isscalar: + time = time[None] # shape (1,) + if target is None: return time, None + # Remember whether target is a single time-dependent target + is_multiple_targets = isinstance(target, (list, tuple)) + is_target_time_dependent = ( + callable(getattr(target, "get_skycoord", None)) and not + hasattr(target, "coord") + ) + is_single_time_dependent_target = (not is_multiple_targets) and is_target_time_dependent + # convert any kind of target argument to non-scalar SkyCoord - target = get_skycoord(target) + target = get_skycoord(target, times=time) + if grid_times_targets: + # Only ambiguous case: a single time-dependent target produces shape == time.shape + # but grid mode requires a leading target axis (1, ...). + if ( + is_single_time_dependent_target + and (not target.isscalar) + and (target.shape == time.shape) + ): + target = target[np.newaxis, ...] + + # Ensure at least one targets axis for scalar targets if target.isscalar: - # ensure we have a (1, 1) shape coord - target = SkyCoord(np.tile(target, 1))[:, np.newaxis] - else: - while target.ndim <= time.ndim: - target = target[:, np.newaxis] + target = SkyCoord(np.tile(target, 1)) # shape (1,) + + # Make target have one more dim than time (first targets axis, then time axes) + while target.ndim < 1 + time.ndim: + target = target[..., np.newaxis] elif not self._is_broadcastable(target.shape, time.shape): raise ValueError('Time and Target arguments cannot be broadcast ' diff --git a/astroplan/scheduling.py b/astroplan/scheduling.py index 8fda4a2b..d9b52796 100644 --- a/astroplan/scheduling.py +++ b/astroplan/scheduling.py @@ -11,6 +11,7 @@ from astropy import units as u from astropy.time import Time from astropy.table import Table +from astropy.coordinates import ConvertError from .utils import time_grid_from_range, stride_array from .constraints import AltitudeConstraint @@ -118,7 +119,7 @@ def __init__(self, blocks, observer, schedule, global_constraints=[]): self.observer = observer self.schedule = schedule self.global_constraints = global_constraints - self.targets = get_skycoord([block.target for block in self.blocks]) + self.targets = [block.target for block in self.blocks] def create_score_array(self, time_resolution=1*u.minute): """ @@ -147,8 +148,9 @@ def create_score_array(self, time_resolution=1*u.minute): applied_score = constraint(self.observer, block.target, times=times) score_array[i] *= applied_score + targets = get_skycoord(self.targets, times=times) for constraint in self.global_constraints: - score_array *= constraint(self.observer, self.targets, times, + score_array *= constraint(self.observer, targets, times, grid_times_targets=True) return score_array @@ -265,45 +267,108 @@ def open_slots(self): return [slot for slot in self.slots if not slot.occupied] def to_table(self, show_transitions=True, show_unused=False): - # TODO: allow different coordinate types + def _format_target_info(target): + if hasattr(target, "coord"): + try: + return target.coord.icrs.to_string("hmsdms") + except ConvertError: + return repr(target.coord) + if hasattr(target, "alt") and hasattr(target, "az"): + parts = [ + "alt={:.6f} deg".format(u.Quantity(target.alt).to_value(u.deg)), + "az={:.6f} deg".format(u.Quantity(target.az).to_value(u.deg)), + ] + if hasattr(target, "pressure"): + try: + p_hpa = u.Quantity(target.pressure).to_value(u.hPa) + except (TypeError, ValueError, u.UnitConversionError): + pass + else: + if p_hpa != 0.0: + parts.append("pressure={:.3f} hPa".format(p_hpa)) + return ", ".join(parts) + if hasattr(target, "satellite"): + return ( + f"#{target.satellite.model.satnum} " + f"epoch {target.satellite.epoch.utc_strftime(format='%Y-%m-%d %H:%M:%S')}" + ) + return "" + target_names = [] start_times = [] end_times = [] durations = [] + target_types = [] + target_info = [] ra = [] dec = [] config = [] + for slot in self.slots: - if hasattr(slot.block, 'target'): + if hasattr(slot.block, "target"): start_times.append(slot.start.iso) end_times.append(slot.end.iso) durations.append(slot.duration.to(u.minute).value) target_names.append(slot.block.target.name) - ra.append(u.Quantity(slot.block.target.ra)) - dec.append(u.Quantity(slot.block.target.dec)) + target_types.append(slot.block.target.__class__.__name__) + try: + ra.append(u.Quantity(slot.block.target.ra)) + except (AttributeError, NotImplementedError): + ra.append("") + try: + dec.append(u.Quantity(slot.block.target.dec)) + except (AttributeError, NotImplementedError): + dec.append("") + target_info.append(_format_target_info(slot.block.target)) config.append(slot.block.configuration) elif show_transitions and slot.block: start_times.append(slot.start.iso) end_times.append(slot.end.iso) durations.append(slot.duration.to(u.minute).value) - target_names.append('TransitionBlock') - ra.append('') - dec.append('') + target_names.append("TransitionBlock") + target_types.append("TransitionBlock") + ra.append("") + dec.append("") + target_info.append("") changes = list(slot.block.components.keys()) - if 'slew_time' in changes: - changes.remove('slew_time') + if "slew_time" in changes: + changes.remove("slew_time") config.append(changes) elif slot.block is None and show_unused: start_times.append(slot.start.iso) end_times.append(slot.end.iso) durations.append(slot.duration.to(u.minute).value) - target_names.append('Unused Time') - ra.append('') - dec.append('') - config.append('') - return Table([target_names, start_times, end_times, durations, ra, dec, config], - names=('target', 'start time (UTC)', 'end time (UTC)', - 'duration (minutes)', 'ra', 'dec', 'configuration')) + target_names.append("Unused Time") + target_types.append("") + ra.append("") + dec.append("") + target_info.append("") + config.append("") + + return Table( + [ + target_names, + start_times, + end_times, + durations, + ra, + dec, + target_types, + target_info, + config, + ], + names=( + "target", + "start time (UTC)", + "end time (UTC)", + "duration (minutes)", + "ra", + "dec", + "target type", + "target info", + "configuration", + ), + ) def new_slots(self, slot_index, start_time, end_time): """ @@ -996,9 +1061,8 @@ def __call__(self, oldblock, newblock, start_time, observer): # use the constraints cache for now, but should move that machinery # to observer from .constraints import _get_altaz - from .target import get_skycoord if oldblock.target != newblock.target: - targets = get_skycoord([oldblock.target, newblock.target]) + targets = get_skycoord([oldblock.target, newblock.target], times=start_time) aaz = _get_altaz(start_time, observer, targets)['altaz'] sep = aaz[0].separation(aaz[1]) if sep/self.slew_rate > 1 * u.second: diff --git a/astroplan/target.py b/astroplan/target.py index e82c7e5e..ba402510 100644 --- a/astroplan/target.py +++ b/astroplan/target.py @@ -2,12 +2,27 @@ # Standard library from abc import ABCMeta +import warnings # Third-party +import numpy as np import astropy.units as u -from astropy.coordinates import SkyCoord, ICRS, UnitSphericalRepresentation +from astropy.time import Time +from astropy.coordinates import SkyCoord, ICRS, UnitSphericalRepresentation, AltAz +try: + from sgp4.io import twoline2rv + from sgp4.earth_gravity import wgs84 as sgp4_wgs84 + from skyfield.api import load, wgs84 + from skyfield.sgp4lib import EarthSatellite + skyfield_available = True +except ImportError: + skyfield_available = False -__all__ = ["Target", "FixedTarget", "NonFixedTarget"] +# Package +from .exceptions import InvalidTLEDataWarning + + +__all__ = ["Target", "FixedTarget", "TLETarget", "NonFixedTarget"] # Docstring code examples include printed SkyCoords, but the format changed # in astropy 1.3. Thus the doctest needs astropy >=1.3 and this is the @@ -45,6 +60,19 @@ def __init__(self, name=None, ra=None, dec=None, marker=None): """ raise NotImplementedError() + @property + def is_time_dependent(self): + """ + Whether this target requires evaluation at a specific time. + + Returns + ------- + is_time_dependent : bool + `True` for targets whose coordinates depend on ``obstime``, + otherwise `False`. + """ + return False + @property def ra(self): """ @@ -188,35 +216,255 @@ class NonFixedTarget(Target): """ -def get_skycoord(targets): +class TLETarget(Target): + """ + A target defined by TLE (Two-Line Element set) for satellites. + """ + + @property + def is_time_dependent(self): + """ + Whether this target requires evaluation at a specific time. + """ + return True + + def __init__(self, line1, line2, name=None, observer=None, skip_tle_check=False): + """ + Parameters + ---------- + line1 : str + The first line of the TLE set + + line2 : str + The second line of the TLE set + + name : str (optional) + Name of the target, used for plotting and representing the target + as a string + + observer : `~astroplan.Observer` (optional) + The location of observer. + If `None`, the observer is assumed to be at sea level at the equator. + + skip_tle_check : bool (optional) + Whether to skip TLE validation + """ + if not skyfield_available: + raise ImportError("Please install the skyfield package to use the TLETarget class.") + + if not skip_tle_check: + twoline2rv(line1, line2, sgp4_wgs84) # Raises ValueError if TLE is invalid + + self.name = name + self.satellite = EarthSatellite(line1, line2, name, load.timescale()) + + if observer is None: + # Prevent circular import and usually not used + from .observer import Observer + self.observer = Observer(latitude=0*u.deg, longitude=0*u.deg, elevation=0*u.m) + else: + self.observer = observer + + longitude, latitude, height = self.observer.location.to_geodetic() + self.geographic_position = wgs84.latlon(latitude.to(u.deg).value, + longitude.to(u.deg).value, + height.to(u.m).value) + + @classmethod + def from_string(cls, tle_string, name=None, *args, **kwargs): + """ + Creates a TLETarget instance from a complete TLE string. + + Parameters + ---------- + tle_string : str + String to be parsed, expected to contain 2 or 3 newline-separated lines. + + name : str (optional) + Name of the target. If not provided and the tle_string contains 3 lines, + the first line will be used as the name. + + args, kwargs : tuple, dict (optional) + Additional arguments and keyword arguments to be passed to the TLETarget constructor. + """ + lines = tle_string.strip().splitlines() + + if len(lines) not in (2, 3): + raise ValueError(f"Expected TLE string to contain 2 or 3 lines, got {len(lines)}") + + if len(lines) == 3: + line1, line2, name = lines[1], lines[2], name or lines[0] + else: # len(lines) == 2 + line1, line2 = lines + return cls(line1, line2, name, *args, **kwargs) + + def _compute_topocentric(self, times=None): + """ + Compute the topocentric coordinates (relative to observer) at a particular time. + + Parameters + ---------- + times : `~astropy.time.Time` (optional) + The time(s) to use in the calculation. + + Returns + ------- + topocentric : `skyfield.positionlib.ICRF` + The topocentric object representing the relative coordinates of the target. + """ + if times is None: + times = Time.now() + ts = load.timescale() + t = ts.from_astropy(times) + + topocentric = (self.satellite - self.geographic_position).at(t) + + # Check for invalid TLE data. A non-None usually message means the computation went beyond + # the physically sensible point. Details: + # https://rhodesmill.org/skyfield/earth-satellites.html#detecting-propagation-errors + message = topocentric.message + if ( + (message is not None and not isinstance(message, list)) or + (isinstance(message, list) and not all(x is None for x in message)) + ): + warnings.warn(f"Invalid TLE Data: {message}", InvalidTLEDataWarning) + + return topocentric + + def get_skycoord(self, times=None): + """ + Get the coordinates of the target at a particular time. + + Parameters + ---------- + times : `~astropy.time.Time` (optional) + The time(s) to use in the calculation. + + Returns + ------- + coord : `~astropy.coordinates.SkyCoord` + A single SkyCoord object, which may be non-scalar, representing the target's + RA/Dec coordinates at the specified time(s). Might return np.nan and output a + warning for times where the elements stop making physical sense. + """ + topocentric = self._compute_topocentric(times) + ra, dec, distance = topocentric.radec() + # No distance, in SkyCoord, distance is from frame origin, but here, it's from observer. + return SkyCoord( + ra.hours*u.hourangle, + dec.degrees*u.deg, + obstime=times, + frame='icrs', + location=self.observer.location, + ) + + def altaz(self, times=None): + """ + Get the altitude and azimuth of the target at a particular time. + + Parameters + ---------- + times : `~astropy.time.Time` (optional) + The time(s) to use in the calculation. + + Returns + ------- + altaz_coord : `~astropy.coordinates.SkyCoord` + SkyCoord object representing the target's altitude and azimuth at the specified time(s) + """ + topocentric = self._compute_topocentric(times) + + temperature_C = None + pressure_mbar = None + if self.observer.temperature is not None: + temperature_C = self.observer.temperature.to_value(u.deg_C) + if self.observer.pressure is not None: + pressure_mbar = self.observer.pressure.to_value(u.mbar) + + alt, az, distance = topocentric.altaz( + temperature_C=temperature_C, + pressure_mbar=pressure_mbar, + ) + + # 'relative_humidity' and 'obswl' were not used in coordinate calculation + altaz_frame = AltAz( + location=self.observer.location, + obstime=times, + pressure=self.observer.pressure, + temperature=self.observer.temperature, + ) + + return SkyCoord(alt=alt.degrees*u.deg, az=az.degrees*u.deg, frame=altaz_frame) + + def __repr__(self): + return f'<{self.__class__.__name__} "{self.name}">' + + def __str__(self): + return self.name + + +def get_skycoord(targets, times=None): """ Return an `~astropy.coordinates.SkyCoord` object. When performing calculations it is usually most efficient to have a single `~astropy.coordinates.SkyCoord` object, rather than a - list of `FixedTarget` or `~astropy.coordinates.SkyCoord` objects. + list of `Target` or `~astropy.coordinates.SkyCoord` objects. - This is a convenience routine to do that. + This is a convenience routine to do that, and it also supports targets + that require evaluation at specific times (e.g., AltAz-defined targets). Parameters - ----------- - targets : list, `~astropy.coordinates.SkyCoord`, `Fixedtarget` - either a single target or a list of targets + ---------- + targets : list, `~astropy.coordinates.SkyCoord`, `~astroplan.Target` + Either a single target or a list of targets. + + times : `~astropy.time.Time` or time-like (optional) + Times at which to evaluate time-dependent targets. Required if any + target in ``targets`` needs evaluation at a time. Returns - -------- + ------- coord : `~astropy.coordinates.SkyCoord` - a single SkyCoord object, which may be non-scalar + A single SkyCoord object, which may be non-scalar. If ``times`` + is provided and any target is time-dependent, coordinates are broadcast + or evaluated across time along subsequent axes. """ - if not isinstance(targets, list): - return getattr(targets, 'coord', targets) - - # get the SkyCoord object itself - coords = [getattr(target, 'coord', target) for target in targets] - - # are all SkyCoordinate's in equivalent frames? If not, convert to ICRS + if times is not None and not isinstance(times, Time): + times = Time(times) + + def _is_time_dependent(obj): + return isinstance(obj, Target) and obj.is_time_dependent + + def _as_coord(obj): + if hasattr(obj, "coord"): + return obj.coord + if callable(getattr(obj, "get_skycoord", None)): + return obj.get_skycoord(times) + return obj + + # Ignore non-scalar SkyCoords targets here + # e.g. from get_body/get_sun, because they represent a single target + is_multiple_targets = isinstance(targets, (list, tuple)) + if not is_multiple_targets: + return _as_coord(targets) + + coords = [_as_coord(t) for t in targets] + + # If any target is time dependent, broadcast fixed coords to match times.shape + time_dependent = (times is not None) and any(_is_time_dependent(t) for t in targets) + times_shape = times.shape if time_dependent else None + + def _broadcast_quantity(q, shape): + """Broadcast quantity to target shape if needed.""" + if shape is None or q.shape == shape: + return q + return u.Quantity(np.broadcast_to(q.to_value(q.unit), shape), q.unit) + + # Are all SkyCoord's in equivalent frames? If not, convert to ICRS convert_to_icrs = not all( - [coord.frame.is_equivalent_frame(coords[0].frame) for coord in coords[1:]]) + [coord.frame.is_equivalent_frame(coords[0].frame) for coord in coords[1:]] + ) # we also need to be careful about handling mixtures of # UnitSphericalRepresentations and others @@ -231,10 +479,18 @@ def get_skycoord(targets): # mixture of frames for coordinate in coords: icrs_coordinate = coordinate.icrs - longitudes.append(icrs_coordinate.ra) - latitudes.append(icrs_coordinate.dec) + lon = icrs_coordinate.ra + lat = icrs_coordinate.dec + if times_shape is not None: + lon = _broadcast_quantity(lon, times_shape) + lat = _broadcast_quantity(lat, times_shape) + longitudes.append(lon) + latitudes.append(lat) if get_distances: - distances.append(icrs_coordinate.distance) + dist = icrs_coordinate.distance + if times_shape is not None: + dist = _broadcast_quantity(dist, times_shape) + distances.append(dist) frame = ICRS() else: # all the same frame, get the longitude and latitude names @@ -249,27 +505,53 @@ def get_skycoord(targets): frame = coords[0].frame for coordinate in coords: - longitudes.append(getattr(coordinate, lon_name)) - latitudes.append(getattr(coordinate, lat_name)) + lon = getattr(coordinate, lon_name) + lat = getattr(coordinate, lat_name) + if times_shape is not None: + lon = _broadcast_quantity(lon, times_shape) + lat = _broadcast_quantity(lat, times_shape) + longitudes.append(lon) + latitudes.append(lat) if get_distances: - distances.append(coordinate.distance) + dist = coordinate.distance + if times_shape is not None: + dist = _broadcast_quantity(dist, times_shape) + distances.append(dist) + + # Convert all longitude/latitude quantities to a common unit + # and plain ndarrays before stacking (robust across units/Quantity subclasses). + lon_unit = longitudes[0].unit + lat_unit = latitudes[0].unit + lon_vals = np.stack([lon.to_value(lon_unit) for lon in longitudes], axis=0) + lat_vals = np.stack([lat.to_value(lat_unit) for lat in latitudes], axis=0) + lon_q = u.Quantity(lon_vals, unit=lon_unit) + lat_q = u.Quantity(lat_vals, unit=lat_unit) # now let's deal with the fact that we may have a mixture of coords with distances and # coords with UnitSphericalRepresentations if all(targets_is_unitsphericalrep): - return SkyCoord(longitudes, latitudes, frame=frame) - elif not any(targets_is_unitsphericalrep): - return SkyCoord(longitudes, latitudes, distances, frame=frame) - else: - """ - We have a mixture of coords with distances and without. - Since we don't know in advance the origin of the frame where further transformation - will take place, it's not safe to drop the distances from those coords with them set. + return SkyCoord(lon_q, lat_q, frame=frame) + + if not any(targets_is_unitsphericalrep): + dist_unit = distances[0].unit + dist_vals = np.stack([d.to_value(dist_unit) for d in distances], axis=0) + dist_q = u.Quantity(dist_vals, unit=dist_unit) + return SkyCoord(lon_q, lat_q, dist_q, frame=frame) + + # Mixture of coords with distances and without. + # Assign large distances to UnitSphericalRepresentation objects. + filled_distances = [] + for dist, is_unitspherical in zip(distances, targets_is_unitsphericalrep): + if is_unitspherical: + fill_vals = np.broadcast_to(100.0, dist.shape if dist.shape else ()) + filled_distances.append(u.Quantity(fill_vals, u.kpc)) + else: + filled_distances.append(dist) - Instead, let's assign large distances to those objects with none. - """ - distances = [distance if distance != 1 else 100*u.kpc for distance in distances] - return SkyCoord(longitudes, latitudes, distances, frame=frame) + dist_unit = filled_distances[0].unit + dist_vals = np.stack([d.to_value(dist_unit) for d in filled_distances], axis=0) + dist_q = u.Quantity(dist_vals, unit=dist_unit) + return SkyCoord(lon_q, lat_q, dist_q, frame=frame) class SpecialObjectFlag: diff --git a/astroplan/tests/test_scheduling.py b/astroplan/tests/test_scheduling.py index 5bfcd874..8a65a0e1 100644 --- a/astroplan/tests/test_scheduling.py +++ b/astroplan/tests/test_scheduling.py @@ -5,10 +5,15 @@ import pytest from astropy.coordinates import SkyCoord, EarthLocation from astropy.time import Time +try: + import skyfield # noqa + HAS_SKYFIELD = True +except ImportError: + HAS_SKYFIELD = False from astroplan.utils import time_grid_from_range from astroplan.observer import Observer -from astroplan.target import FixedTarget, get_skycoord +from astroplan.target import FixedTarget, TLETarget, get_skycoord from astroplan.constraints import (AirmassConstraint, AtNightConstraint, _get_altaz, MoonIlluminationConstraint, PhaseConstraint) from astroplan.periodic import EclipsingSystem @@ -329,3 +334,46 @@ def test_scorer(): scores = scorer.create_score_array(time_resolution=20 * u.minute) # the ``global_constraint``: constraint2 should have applied to the blocks assert np.array_equal(c2, scores) + + +@pytest.mark.skipif('not HAS_SKYFIELD') +def test_priority_scheduler_TLETarget(): + line1 = "1 25544U 98067A 23215.27256123 .00041610 00000-0 73103-3 0 9990" + line2 = "2 25544 51.6403 95.2411 0000623 157.9606 345.0624 15.50085581409092" + iss = TLETarget(name="ISS (ZARYA)", line1=line1, line2=line2, observer=apo) + constraints = [AirmassConstraint(3, boolean_constraint=False)] + blocks = [ObservingBlock(t, 5*u.minute, i) for i, t in enumerate(targets)] + blocks.append(ObservingBlock(iss, 0.5*u.minute, 4)) + start_time = Time('2016-02-06 03:00:00') + end_time = start_time + 1*u.hour + scheduler = PriorityScheduler(transitioner=default_transitioner, + constraints=constraints, observer=apo, + time_resolution=0.5*u.minute) + schedule = Schedule(start_time, end_time) + scheduler(blocks, schedule) + assert len(schedule.observing_blocks) == 3 + assert all(np.abs(block.end_time - block.start_time - block.duration) < + 1*u.second for block in schedule.scheduled_blocks) + assert all([schedule.observing_blocks[0].target == polaris, + schedule.observing_blocks[1].target == rigel, + schedule.observing_blocks[2].target == iss]) + + # test that the scheduler does not error when called with a partially + # filled schedule + scheduler(blocks, schedule) + scheduler(blocks, schedule) + + # Time too far in the future where elements stop making physical sense + with pytest.warns(): + start_time = Time('2035-08-02 10:00:00') + end_time = start_time + 1*u.hour + schedule = Schedule(start_time, end_time) + + # InvalidTLEDataWarning/AstropyWarning and + # ErfaWarning: ERFA function "utctai" yielded 121 of "dubious year (Note 3)" + with pytest.warns(): + scheduler(blocks, schedule) + assert len(schedule.observing_blocks) == 3 + assert all([schedule.observing_blocks[0].target == vega, + schedule.observing_blocks[1].target == rigel, + schedule.observing_blocks[2].target == polaris]) diff --git a/astroplan/tests/test_target.py b/astroplan/tests/test_target.py index ba2dde47..2a9adbc4 100644 --- a/astroplan/tests/test_target.py +++ b/astroplan/tests/test_target.py @@ -5,10 +5,17 @@ import pytest from astropy.coordinates import SkyCoord, GCRS, ICRS from astropy.time import Time +import numpy as np +try: + import skyfield # noqa + HAS_SKYFIELD = True +except ImportError: + HAS_SKYFIELD = False # Package -from astroplan.target import FixedTarget, get_skycoord +from astroplan.target import FixedTarget, TLETarget, get_skycoord from astroplan.observer import Observer +from astroplan.utils import time_grid_from_range @pytest.mark.remote_data @@ -45,6 +52,139 @@ def test_FixedTarget_ra_dec(): 'SkyCoord') +@pytest.mark.remote_data +@pytest.mark.skipif('not HAS_SKYFIELD') +def test_TLETarget(): + tle_string = ("ISS (ZARYA)\n" + "1 25544U 98067A 23215.27256123 .00041610 00000-0 73103-3 0 9990\n" + "2 25544 51.6403 95.2411 0000623 157.9606 345.0624 15.50085581409092") + line1 = "1 25544U 98067A 23215.27256123 .00041610 00000-0 73103-3 0 9990" + line2 = "2 25544 51.6403 95.2411 0000623 157.9606 345.0624 15.50085581409092" + subaru = Observer.at_site('subaru') # (lon, lat, el)=(-155.476111 deg, 19.825555 deg, 4139.0 m) + subaru_temp_pressure = Observer.at_site('subaru', temperature=-10*u.Celsius, pressure=0.9*u.bar) + time = Time("2023-08-02 10:00", scale='utc') + times = time_grid_from_range([time, time + 3.1*u.hour], + time_resolution=1 * u.hour) + + tle_target1 = TLETarget(name="ISS (ZARYA)", line1=line1, line2=line2, observer=subaru) + tle_target2 = TLETarget.from_string(tle_string=tle_string, observer=subaru) + tle_target_no_observer = TLETarget(name="ISS (ZARYA)", line1=line1, line2=line2, observer=None) + tle_target_temp_pressure = TLETarget( + name="ISS (ZARYA)", line1=line1, line2=line2, observer=subaru_temp_pressure + ) + + assert tle_target1.name == "ISS (ZARYA)" + assert tle_target2.name == "ISS (ZARYA)" + assert repr(tle_target1) == repr(tle_target2) + assert str(tle_target1) == str(tle_target2) + + assert isinstance(tle_target_no_observer.observer, Observer) + assert abs(tle_target_no_observer.observer.location.lat) < 0.001*u.deg + tle_target_no_observer.get_skycoord(time) # Just needs to work + + # Single time (Below Horizon) + ra_dec1 = tle_target1.get_skycoord(time) # '08h29m26.00003243s +07d31m36.65950907s' + ra_dec2 = tle_target2.get_skycoord(time) + assert ra_dec1.to_string('hmsdms') == ra_dec2.to_string('hmsdms') + + # Comparison with the JPL Horizons System + ra_dec_horizon_icrf = SkyCoord("08h29m27.029117s +07d31m28.35610s") + # ICRF: Compensated for the down-leg light-time delay aberration + assert ra_dec1.separation(ra_dec_horizon_icrf) < 20*u.arcsec # 17.41″ + # Distance estimation: ~ 2 * tan(17,41/2/3600) * 11801,56 = 57 km + + ra_dec_horizon_ref_apparent = SkyCoord("08h30m54.567398s +08d05m32.72764s") + # Refracted Apparent: In an equatorial coordinate system with all compensations + assert ra_dec1.separation(ra_dec_horizon_ref_apparent) > 2000*u.arcsec # 2424.44″ + + ra_dec_horizon_icrf_ref_apparent = SkyCoord("08h29m37.373866s +08d10m14.78811s") + # ICRF Refracted Apparent: In the ICRF reference frame with all compensations + assert ra_dec1.separation(ra_dec_horizon_icrf_ref_apparent) > 2000*u.arcsec # 2324.28″ + + # Skyfield appears to use no compensations. According to this, it's not even recommended to + # compensate for light travel time. Compensating changes the difference to ra_dec_horizon_icrf + # to 20.05 arcsec. + # https://rhodesmill.org/skyfield/earth-satellites.html#avoid-calling-the-observe-method + + # Single time (Above Horizon) + time_ah = Time("2023-08-02 07:20", scale='utc') + ra_dec_ah = tle_target1.get_skycoord(time_ah) # '11h19m48.53631001s +44d49m45.22194611s' + + ra_dec_ah_horizon_icrf = SkyCoord("11h19m49.660349s +44d49m34.65875s") + assert ra_dec_ah.separation(ra_dec_ah_horizon_icrf) < 20*u.arcsec # 15.95″ + ra_dec_ah_horizon_ref_apparent = SkyCoord("11h21m34.102381s +44d43m40.06899s") + assert ra_dec_ah.separation(ra_dec_ah_horizon_ref_apparent) > 1000*u.arcsec # 1181.84″ + ra_dec_ah_horizon_icrf_ref_apparent = SkyCoord("11h20m16.627261s +44d51m24.25337s") + assert ra_dec_ah.separation(ra_dec_ah_horizon_icrf_ref_apparent) > 300*u.arcsec # 314.75″ + + # Default is WGS72 for Skyfield. Coordinates with WGS84 gravity model that Horizon uses: + # '11h19m48.28084569s +44d49m46.33649241s' - 18.75″ + # See 'Build a satellite with a specific gravity model' in Skyfield's Earth Satellites docu + + # There are many potential sources of inaccuracies, and it's not all super precise. + # Should the accuracy be better than < 25*u.arcsec when compared to the JPL Horizons System? + + # Multiple times + ra_dec1 = tle_target1.get_skycoord(times) + ra_dec2 = tle_target2.get_skycoord(times) + ra_dec_from_horizon = SkyCoord(["08h29m27.029117s +07d31m28.35610s", + "06h25m46.672661s -54d32m16.77533s", + "13h52m08.854291s +04d26m49.56432s", + "09h20m04.872215s -00d51m21.17432s"]) + # 17.41″, 22.05″, 3.20″, 17.55″ + assert all(list(ra_dec1.separation(ra_dec_from_horizon) < 25*u.arcsec)) + assert ra_dec1.to_string('hmsdms') == ra_dec2.to_string('hmsdms') + + # TLE Check + line1_invalid = "1 25544U 98067A .00041610 00000-0 73103-3 0 9990" + with pytest.raises(ValueError): + TLETarget(name="ISS (ZARYA)", line1=line1_invalid, line2=line2, observer=subaru) + TLETarget(name="ISS (ZARYA)", line1=line1_invalid, line2=line2, observer=subaru, + skip_tle_check=True) + + # from_string + tle_string = ("1 25544U 98067A 23215.27256123 .00041610 00000-0 73103-3 0 9990\n" + "2 25544 51.6403 95.2411 0000623 157.9606 345.0624 15.50085581409092") + tle_target3 = TLETarget.from_string(tle_string=tle_string, observer=subaru, name="ISS") + assert tle_target3.name == "ISS" + + tle_string = "1 25544U 98067A 23215.27256123 .00041610 00000-0 73103-3 0 9990" + with pytest.raises(ValueError): + TLETarget.from_string(tle_string=tle_string, observer=subaru) + + # AltAz (This is slow) + altaz_observer = subaru.altaz(time_ah, tle_target1) + altaz_skyfield = tle_target1.altaz(time_ah) + + assert altaz_observer.separation(altaz_skyfield) < 21*u.arcsec + + # AltAz with atmospheric refraction (temperature and pressure) + altaz_observer = subaru_temp_pressure.altaz(time_ah, tle_target_temp_pressure) + altaz_skyfield = tle_target_temp_pressure.altaz(time_ah) + + assert altaz_observer.separation(altaz_skyfield) < 26*u.arcsec + + # AltAz with multiple times + # subaru.altaz(times, tle_target1) + altaz_multiple = tle_target1.altaz(times) + assert len(altaz_multiple.obstime) == len(altaz_multiple) == len(times) + + # Time too far in the future where elements stop making physical sense + with pytest.warns(): # ErfaWarning: ERFA function "dtf2d" yielded 1 of "dubious year (Note 6) + time_invalid = Time("2035-08-02 10:00", scale='utc') + times_list = list(times) + times_list[2] = Time("2035-08-02 10:00", scale='utc') + times_invalid = Time(times_list) + # InvalidTLEDataWarning and + # ErfaWarning: ERFA function "utctai" yielded 1 of "dubious year (Note 3)" + with pytest.warns(): + assert np.isnan(tle_target1.get_skycoord(time_invalid).ra) + # InvalidTLEDataWarning and + # ErfaWarning: ERFA function "utctai" yielded 1 of "dubious year (Note 3)" + with pytest.warns(): + assert np.isnan(tle_target1.get_skycoord(times_invalid)[2].ra) + + @pytest.mark.remote_data def test_get_skycoord(): m31 = SkyCoord(10.6847083*u.deg, 41.26875*u.deg) @@ -80,3 +220,32 @@ def test_get_skycoord(): coo = get_skycoord([m31_gcrs, m31_gcrs_with_distance]) assert coo.is_equivalent_frame(m31_gcrs.frame) assert len(coo) == 2 + + +@pytest.mark.remote_data +@pytest.mark.skipif('not HAS_SKYFIELD') +def test_get_skycoord_with_TLETarget(): + skycoord_targed = SkyCoord(10.6847083*u.deg, 41.26875*u.deg) + fixed_target1 = FixedTarget(name="fixed1", coord=SkyCoord(279.23458, 38.78369, unit='deg')) + line1 = "1 25544U 98067A 23215.27256123 .00041610 00000-0 73103-3 0 9990" + line2 = "2 25544 51.6403 95.2411 0000623 157.9606 345.0624 15.50085581409092" + subaru = Observer.at_site('subaru') + tle_target1 = TLETarget(name="ISS (ZARYA)", line1=line1, line2=line2, observer=subaru) + + time = Time("2023-08-02 10:00", scale='utc') + times = time_grid_from_range([time, time + 3.1*u.hour], + time_resolution=1 * u.hour) + + tle_output = get_skycoord(tle_target1, time) + assert tle_output.size == 1 + tle_output = get_skycoord(tle_target1, times) + assert tle_output.shape == (4,) + tle_output = get_skycoord([tle_target1, tle_target1], time) + assert tle_output.shape == (2,) + tle_output = get_skycoord([tle_target1, tle_target1], times) + assert tle_output.shape == (2, 4) + + mixed_output = get_skycoord([skycoord_targed, fixed_target1, tle_target1], time) + assert mixed_output.shape == (3,) + mixed_output = get_skycoord([skycoord_targed, fixed_target1, tle_target1], times) + assert mixed_output.shape == (3, 4) diff --git a/pyproject.toml b/pyproject.toml index c8ad7c76..e4110c49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,8 @@ dependencies = [ [project.optional-dependencies] all = [ "matplotlib>=3.6.0", - "astroquery" + "astroquery", + "skyfield" ] test = [ "pytest-astropy", @@ -48,6 +49,9 @@ docs = [ plotting = [ "astroplan[all]" ] +tle = [ + "skyfield" +] [project.readme] file = "README.rst"