Source code for romancal.orientation.set_telescope_pointing

"""Set Telescope Pointing from Observatory Engineering Telemetry.

Calculate and update the pointing-related and world coordinate system-related
keywords. Given a time period, usually defined by an exposure, the engineering
mnemonic database is queried for observatory orientation. The orientation
defines the sky coordinates that a particular point on the observatory is
pointed to. Then, using a set of matrix transformations, the sky coordinates of
the reference pixel of a desired aperture is calculated.

The transformations are defined by the STScI Innerspace (non-public) document
titles "Quaternion Transforms for Coarse Pointing WCS". The code itself follows
a demonstrative jupyter notebook.

**Interface**

The primary usage is through the command line interface
``set_telescope_pointing``. Operating on a list of Roman Level 1 exposures,
this command updates the world coordinate system keywords with the values
necessary to translate from aperture pixel to sky coordinates.

Access to the Roman Engineering Mnemonic database is required. See the
:ref:`Engineering Database Interface <engdb>` for more information.

Programmatically, the command line is implemented by the function
`~roman.orientation.set_telescope_pointing.add_wcs`, which calls the basic function
`~roman.orientation.set_telescope_pointing.calc_wcs`.

There are two data structures used to maintain the state of the transformation.
`~roman.orientation.set_telescope_pointing.TransformParameters` contains the parameters
needed to perform the transformations.
`~roman.orientation.set_telescope_pointing.Transforms` contains the calculated
transformation matrices.

**Transformation Matrices**

All the transformation matrices, as defined by
`~roman.orientation.set_telescope_pointing.Transforms`, are Direction Cosine Matrices
(DCM). A DCM contains the Euler rotation angles that represent the sky
coordinates for a particular frame-of-reference. The initial DCM is provided
through the engineering telemetry and represents how the observatory is orientated.

**META Affected**

The following meta values are populated:

    - meta.pointing.dec_v1
    - meta.pointing.pa_aperture
    - meta.pointing.pa_v3
    - meta.pointing.ra_v1
    - meta.pointing.target_dec
    - meta.pointing.target_ra
    - meta.wcsinfo.dec_ref
    - meta.wcsinfo.ra_ref
    - meta.wcsinfo.roll_ref
    - meta.wcsinfo.s_region

"""

import dataclasses
import logging
import sys
from collections import defaultdict, namedtuple
from collections.abc import Callable
from copy import copy
from math import cos, sin
from typing import Any

import asdf
import numpy as np
import roman_datamodels as rdm
from astropy.table import Table
from astropy.time import Time, TimeDelta
from stcal.alignment.util import compute_s_region_keyword
from stcal.velocity_aberration import compute_va_effects_vector

from ..lib.engdb.engdb_lib import EngDB_Value
from ..lib.engdb.engdb_tools import engdb_service

__all__ = [
    "TransformParameters",
    "Transforms",
    "WCSRef",
    "add_wcs",
    "calc_transforms",
    "calc_wcs",
    "calc_wcs_over_time",
    "update_wcs",
]

# Setup logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
DEBUG_FULL = logging.DEBUG - 1
LOGLEVELS = [logging.INFO, logging.DEBUG, DEBUG_FULL]

# Datamodels that can be updated, normally
EXPECTED_MODELS = rdm.datamodels.ScienceRawModel

# Exposure types that can be updated, normally
TYPES_TO_UPDATE = set()

# Mnemonics needed.
COARSE_MNEMONICS_QUATERNION_ECI = [f"SCF_AC_SDR_QBJ_{idx + 1}" for idx in range(4)]
COARSE_MNEMONICS_B2FGS_EST = [f"SCF_AC_EST_FGS_qbr{idx + 1}" for idx in range(4)]
COARSE_MNEMONICS_B2FGS_PRELOAD = [f"SCF_AC_FGS_TBL_Qb{idx + 1}" for idx in range(4)]
COARSE_MNEMONICS = (
    COARSE_MNEMONICS_QUATERNION_ECI
    + COARSE_MNEMONICS_B2FGS_EST
    + COARSE_MNEMONICS_B2FGS_PRELOAD
)

# Default and pre-defined matricies.
# Conversion of the FCS reference point from the V-Frame.
# This is the pre-launch value, later to be refined and provided
# in the SIAF
M_V2FCS0 = np.array(
    [
        [-0.0000001, 0.5000141, 0.8660173],
        [0.0086567, -0.8659848, 0.4999953],
        [0.9999625, 0.0074969, -0.0043284],
    ],
)

# Default B-frame to FCS frame, M_b_to_fcs
# Pre-launch this is the same as M_v_to_fcs.
M_B2FCS0 = M_V2FCS0

# Maximum absolute speed of the observatory. Used for sanity check is defined
# as the sum of the absolute components of the velocity.
MAX_OBSERVATORY_SPEED = 150

# Define the transformation matrices to move between the Idealized Coordinate System (ICS)
# and the Idealized Coordinate System (Idl). ICS is the spacecraft-centric system used by
# all frames up through the V-frame. Idl is used by the instruments.
# Reference: Innerspace document
M_idl2ics = MX2Z = np.array([[0, 1, 0], [0, 0, 1], [1, 0, 0]])
M_ics2idl = MZ2X = np.array([[0, 0, 1], [1, 0, 0], [0, 1, 0]])

# Common conversions.
# Yes, these are available in a number of packages. However, they are explicitly defined
# to provide consistency and, if any were to be changed, an explicit history of that change.

# Conversion from seconds to MJD
SECONDS2MJD = 1 / 24 / 60 / 60

# Degree, radian, angle transformations
R2D = 180.0 / np.pi
D2R = np.pi / 180.0
A2R = D2R / 3600.0
R2A = 3600.0 * R2D
PI2 = np.pi * 2.0

# Potential errors generated by mnemonic retrival
EXPECTED_ERRORS = (OSError, RuntimeError, ValueError)

# Pointing container
# Attributes are as follows. Except for the observation time, all values
# are retrieved from the engineering data.
#    fgs_q        : Quaternion representing orientation of the FGS frame relative to the Observatory frame
#    obstime      : Time the pointing information refers to.
#    q            : Quaternion of the FGS.
Pointing = namedtuple("Pointing", ["fgs_q", "obstime", "q"])
Pointing.__new__.__defaults__ = (None,) * 3


# Transforms
[docs] @dataclasses.dataclass class Transforms: """The matrices used in calculation of the M_eci2siaf transformation.""" #: B-frame to FGS m_b2fgs: np.ndarray | None = None #: ECI to B-frame m_eci2b: np.ndarray | None = None #: ECI to FCS m_eci2fcs: np.ndarray | None = None #: ECI to GS m_eci2gs: np.ndarray | None = None #: ECI to GS apparent m_eci2gsapp: np.ndarray | None = None #: ECI to V m_eci2v: np.ndarray | Any = None #: ECI to Guide star apparent position m_fgs2gsapp: np.ndarray | None = None #: GS apparent to GS corrected m_gsapp2gsics: np.ndarray | None = None
[docs] @classmethod def from_asdf(cls, asdf_file): """ Create Transforms from AsdfFile. Parameters ---------- asdf_file : Stream-like or `asdf.AsdfFile` The asdf to create from. Returns ------- transforms : Transforms The Transforms instance. """ if isinstance(asdf_file, asdf.AsdfFile): transforms = asdf_file.tree["transforms"] else: with asdf.open(asdf_file, memmap=False, lazy_load=False) as af: transforms = af.tree["transforms"] return cls(**transforms)
[docs] def to_asdf(self): """ Serialize to AsdfFile. Returns ------- asdf_file : asdf.AsdfFile The ASDF serialization. """ self_dict = dataclasses.asdict(self) asdf_file = asdf.AsdfFile({"transforms": self_dict}) return asdf_file
[docs] def write_to_asdf(self, path): """ Serialize to a file path. Parameters ---------- path : Stream-like Output file path. """ asdf_file = self.to_asdf() asdf_file.write_to(path, all_array_storage="inline")
# WCS reference container WCSRef = namedtuple("WCSRef", ["ra", "dec", "pa", "s_region"]) WCSRef.__new__.__defaults__ = (None, None, None, "")
[docs] @dataclasses.dataclass class TransformParameters: """Parameters required for the calculations.""" #: If telemetry cannot be determined, use existing information in the observation's header. allow_default: bool = False #: Aperture in use aperture: str = "" #: Default quaternion to use if engineering is not available. default_quaternion: tuple | None = None #: Commanded position of the guide star in (H, V) space. gscommanded: tuple | None = None #: Observation end time obsend: float | None = None #: Observation start time obsstart: float | None = None #: The observatory orientation, represented by the ECI quaternion, # and other engineering mnemonics pointing: Pointing | Any = None #: Reduction function to use on values. reduce_func: Callable | None = None #: Engineering database information service_kwargs: dict | None = None #: If no telemetry can be found during the observation, #: the time, in seconds, beyond the observation time to search for telemetry. tolerance: float = 60.0 # Observatory velocity velocity: tuple | None = None
[docs] def as_reprdict(self): """Return a dict where all values are REPR of their values.""" # numpydoc ignore=RT01 r = { field.name: repr(getattr(self, field.name)) for field in dataclasses.fields(self) } return r
[docs] def update_from_engdb(self): """Update pointing information.""" self.pointing = get_pointing( self.obsstart, self.obsend, mnemonics_to_read=COARSE_MNEMONICS, service_kwargs=self.service_kwargs, tolerance=self.tolerance, reduce_func=self.reduce_func, )
def __post_init__(self): # Setup the default reduction function. if not self.reduce_func: self.reduce_func = pointing_from_average
[docs] def add_wcs( filename, dry_run=False, save_transforms=None, **transform_kwargs, ): """Add WCS information to a Roman DataModel. Telescope orientation is attempted to be obtained from the engineering database. Failing that, a default pointing is used based on input default information. The file is updated in-place. Parameters ---------- filename : Path-like The path to a data file. dry_run : bool Run through the calculations but do not modify the file. save_transforms : Path-like or None File to save the calculated transforms to. transform_kwargs : dict dict to use to initialize the `TransformParameters` object. See `TransformParameters` for more information.` Notes ----- This function adds absolute pointing information to the Roman datamodels provided. By default, only Level 1 exposures are allowed to be updated. These have the suffixes of "uncal" representing datamodel ScienceRawModel. Any higher level product, from Level 2 and beyond, that has had the `assign_wcs` step applied, have improved WCS information. Running this task on such files will potentially corrupt the WCS. """ logger.info("Updating WCS info for file %s", filename) with rdm.open(filename) as model: if not isinstance(model, EXPECTED_MODELS): logger.warning("Input %s is not of an expected type (uncal)", model) logger.warning( " Updating pointing may have no effect or detrimental effects on the " "WCS information," ) logger.warning( " especially if the input is the result of Level2b or higher calibration." ) raise TypeError( f"Input model {model} is not one of {EXPECTED_MODELS}." "\n\tFailing WCS processing." ) t_pars, transforms = update_wcs( model, **transform_kwargs, ) if dry_run: logger.info("Dry run requested; results are not saved.") else: logger.info("Saving updated model %s", filename) model.save(filename) if transforms and save_transforms: logger.info("Saving transform matrices to %s", save_transforms) transforms.write_to_asdf(save_transforms) logger.info("...update completed")
[docs] def update_wcs( model, **transform_kwargs, ): """ Update WCS pointing information. Given a `roman.datamodels.DataModel`, determine the simple WCS parameters from the SIAF keywords in the model and the engineering parameters that contain information about the telescope orientation. It presumes all the accessed keywords are present. Parameters ---------- model : `~roman.datamodels.DataModel` The model to update. **transform_kwargs : dict dict to use to initialize the `TransformParameters` object. See `TransformParameters` for more information.` Returns ------- t_pars, transforms : TransformParameters, Transforms The parameters and transforms calculated. May be None for either if telemetry calculations were not performed. """ # Configure transformation parameters. t_pars = TransformParameters(**transform_kwargs) t_pars_from_model(model, t_pars) # Calculate WCS. transforms = update_wcs_from_telem(model, t_pars) return t_pars, transforms
def update_wcs_from_telem(model, t_pars: TransformParameters): """ Update WCS pointing information. Given a `roman.datamodels.DataModel`, determine the simple WCS parameters from the SIAF keywords in the model and the engineering parameters that contain information about the telescope pointing. It presumes all the accessed keywords are present. Parameters ---------- model : `~roman.datamodels.DataModel` The model to update. The update is done in-place. t_pars : `TransformParameters` The transformation parameters. Parameters are updated during processing. Returns ------- transforms : Transforms or None If available, the transformation matrices. """ logger.info("Updating wcs from telemetry.") # Initialization. If provided, provide a default Pointing. transforms = None # Assume no transforms are calculated. quality = None # Unknown pointing quality. # Get the pointing information try: t_pars.update_from_engdb() except ValueError as exception: logger.error("Cannot retrieve valid engineering orientation data") if t_pars.default_quaternion is None or not t_pars.allow_default: logger.error("Use of default orientation has been disabled. Aborting.") raise else: logger.warning("Exception is %s", exception) obstime = Time( (t_pars.obsstart.mjd + t_pars.obsend.mjd) / 2.0, format="mjd" ) logger.warning( "Using provided default quaternion: %s", t_pars.default_quaternion ) logger.warning(" at time %s", obstime.iso) logger.info("Setting pointing quality to PLANNED") t_pars.pointing = Pointing(q=t_pars.default_quaternion, obstime=obstime) quality = "PLANNED" else: logger.info("Successful read of engineering quaternions:") logger.info("\tPointing: %s", t_pars.pointing) quality = "CALCULATED" # Attempt to calculate WCS information try: wcsinfo, vinfo, transforms = calc_wcs(t_pars) logger.info("Setting pointing quality to %s", quality) except EXPECTED_ERRORS: logger.error("WCS calculation has failed") raise # Update model meta. logger.info("Aperture WCS info: %s", wcsinfo) logger.info("V1 WCS info: %s", vinfo) update_meta(model, wcsinfo, vinfo, quality) return transforms
[docs] def calc_wcs_over_time(obsstart, obsend, t_pars: TransformParameters): """ Calculate V1 and aperture WCS over a time period. Parameters ---------- obsstart, obsend : float MJD observation start/end times t_pars : `TransformParameters` The transformation parameters. Parameters are updated during processing. Returns ------- obstimes, wcsinfos, vinfos : [astropy.time.Time[,...]], [WCSRef[,...]], [WCSRef[,...]] A 3-tuple is returned with the WCS pointings for the aperture and the V1 axis. """ # Setup structures obstimes = [] wcsinfos = [] vinfos = [] # Calculate WCS try: pointings = get_pointing( obsstart, obsend, service_kwargs=t_pars.service_kwargs, tolerance=t_pars.tolerance, reduce_func=t_pars.reduce_func, ) except ValueError: logger.warning( "Cannot get valid engineering mnemonics from engineering database" ) raise if not isinstance(pointings, list): pointings = [pointings] for pointing in pointings: t_pars.pointing = pointing wcsinfo, vinfo, transforms = calc_wcs(t_pars) obstimes.append(pointing.obstime) wcsinfos.append(wcsinfo) vinfos.append(vinfo) return obstimes, wcsinfos, vinfos
[docs] def calc_wcs(t_pars: TransformParameters): """ Calculate WCS. Given observatory orientation and target aperture, calculate V1 and Reference Pixel sky coordinates. Parameters ---------- t_pars : `TransformParameters` The transformation parameters. Parameters are updated during processing. Returns ------- wcsinfo, vinfo, transforms : WCSRef, WCSRef, Transforms A 3-tuple is returned with the WCS pointing for the aperture and the V1 axis, and the transformation matrices. """ # Calculate transforms transforms = calc_transforms(t_pars) # Calculate the V1 WCS information vinfo = calc_wcs_from_matrix(transforms.m_eci2v.T) # Calculate the Aperture WCS wcsinfo = wcsinfo_from_siaf(t_pars.aperture, vinfo) # That's all folks return wcsinfo, vinfo, transforms
def wcsinfo_from_siaf(aperture, vinfo): """Calculate aperture reference point WCS from V-frame WCS and SIAF Parameters ---------- aperture : str The aperture in use vinfo : WCSRef The V-frame WCS Returns ------- wcsinfo : WCSRef The WCS for the aperture's reference point, as defined by its SIAF. """ from pysiaf import Siaf from pysiaf.utils.rotations import sky_posangle siaf = Siaf("roman") wfi = siaf[aperture.upper()] # For transformations between the telescope frame and all other frames, # an attitude matrix is created using the V-frame WCS information. attitude = attitude_from_v1(vinfo) wfi.set_attitude_matrix(attitude) skycoord = wfi.reference_point(to_frame="sky") pa_v3 = sky_posangle(attitude, *skycoord) # Compute S_REGION keyword corners = wfi.corners("sky") footprint = np.array([corners[0], corners[1]]).T s_region_keyword = compute_s_region_keyword(footprint) wcsinfo = WCSRef( ra=skycoord[0], dec=skycoord[1], pa=pa_v3, s_region=s_region_keyword ) return wcsinfo
[docs] def calc_transforms(t_pars: TransformParameters): """ COARSE calculation. This implements the overall coarse-mode transfomations as defined by the innerspace document "Quaternion Transforms for Coarse Pointing WCS" 2026-01-06. Parameters ---------- t_pars : TransformParameters The transformation parameters. Parameters are updated during processing. Returns ------- transforms : Transforms The list of coordinate matrix transformations Notes ----- """ logger.info("Calculating transforms...") t = Transforms() # Quaternion to M_eci2b t.m_eci2b = calc_quat2matrix(t_pars.pointing.q) # ECI to FCS t.m_b2fgs = calc_m_b2fgs(t_pars.pointing.fgs_q) t.m_eci2fcs = np.dot(t.m_b2fgs, t.m_eci2b) # FGS to Guide star apparent. if t_pars.gscommanded is None: logger.warning( "No command guide star position provided. Assuming guide star is at the aperture reference position." ) hv = (0.0, 0.0) else: hv = t_pars.gscommanded fgs_x, fgs_y = hv_to_fgs(t_pars.aperture, *hv) t.m_fgs2gsapp = calc_m_fgs2gsapp(fgs_x, fgs_y) # ECI to GS apparent t.m_eci2gsapp = np.linalg.multi_dot([t.m_fgs2gsapp, t.m_b2fgs, t.m_eci2b]) # Use calc_gs2gsapp to convert m_eci2gsapp to VA-applied (or "aberrated") m_eci2gs # Note that calc_gs2gsapp should be renamed to calc_gsapp2gs t.m_gsapp2gsics = calc_gsapp2gs(t.m_eci2gsapp, t_pars.velocity) # ECI to GS t.m_eci2gs = np.linalg.multi_dot([M_ics2idl, t.m_gsapp2gsics, t.m_eci2gsapp]) # ECI to V t.m_eci2v = np.linalg.multi_dot( [M_V2FCS0.T, t.m_fgs2gsapp.T, M_idl2ics, t.m_eci2gs] ) return t
def calc_attitude_matrix(wcs, yangle, position): """ Calculate the DCM attitude from known positions and roll angles. Parameters ---------- wcs : WCSRef The sky position. yangle : float The IdlYangle of the point in question. position : numpy.array(2) The position in Ideal frame. Returns ------- m : np.array(3,3) The transformation matrix """ # Convert to radians ra = wcs.ra * D2R dec = wcs.dec * D2R yangle_ra = yangle * D2R pos_rads = position * A2R v2 = pos_rads[0] v3 = pos_rads[1] # Create the matrices r1 = dcm(ra, dec, yangle_ra) r2 = np.array( [ [cos(v2) * cos(v3), -sin(v2), -cos(v2) * sin(v3)], [sin(v2) * cos(v3), cos(v2), -sin(v2) * sin(v3)], [sin(v3), 0.0, cos(v3)], ] ) # Final transformation m = np.dot(r2, r1) logger.debug("attitude DCM: %s", m) return m def calc_wcs_from_matrix(m): """ Calculate the WCS information from a DCM. Parameters ---------- m : np.array((3, 3)) The DCM matrix to extract WCS information from. Returns ------- wcs : WCSRef The WCS. """ # V1 RA/Dec is the first row of the transform v1_ra, v1_dec = vector_to_angle(m[0]) wcs = WCSRef(v1_ra, v1_dec, None) # V3 is the third row of the transformation v3_ra, v3_dec = vector_to_angle(m[2]) v3wcs = WCSRef(v3_ra, v3_dec, None) # Calculate the V3 position angle v1_pa = calc_position_angle(wcs, v3wcs) # Convert to degrees wcs = WCSRef(ra=wcs.ra * R2D, dec=wcs.dec * R2D, pa=v1_pa * R2D) logger.debug("wcs: %s", wcs) return wcs def calc_quat2matrix(q): """ Create a Direction Cosine Matrix (DCM) from a quaternion. Parameters ---------- q : np.array(q1, q2, q3, q4) Array of quaternions from the engineering database. Returns ------- transform : np.array((3, 3)) The transform matrix representing the transformation from observatory orientation to J-Frame. """ q1, q2, q3, q4 = q transform = np.array( [ [ 1.0 - 2.0 * q2 * q2 - 2.0 * q3 * q3, 2.0 * (q1 * q2 + q3 * q4), 2.0 * (q3 * q1 - q2 * q4), ], [ 2.0 * (q1 * q2 - q3 * q4), 1.0 - 2.0 * q3 * q3 - 2.0 * q1 * q1, 2.0 * (q2 * q3 + q1 * q4), ], [ 2.0 * (q3 * q1 + q2 * q4), 2.0 * (q2 * q3 - q1 * q4), 1.0 - 2.0 * q1 * q1 - 2.0 * q2 * q2, ], ], dtype=float, ) logger.debug("quaternion: %s", transform) return transform def calc_position_angle(point, ref): """ Calculate position angle from reference to point. tan(pa) = cos(dec_r) * sin(ra_r - ra_p) / (sin(dec_r)cos(dec_p) - cos(dec_r)sin(dec_p)cos(ra_r-ra_p)) where:: pa : position angle *_r : reference *_p : point Typically the reference is the V3 RA/DEC and point is the object RA/DEC. Parameters ---------- point : WCSRef The POINT wcs parameters, in radians. ref : WCSRef The TARGET wcs parameters, in radians. Returns ------- point_pa : float The POINT position angle, in radians """ y = cos(ref.dec) * sin(ref.ra - point.ra) x = sin(ref.dec) * cos(point.dec) - cos(ref.dec) * sin(point.dec) * cos( ref.ra - point.ra ) point_pa = np.arctan2(y, x) if point_pa < 0: point_pa += PI2 if point_pa >= PI2: point_pa -= PI2 logger.debug("Given reference: %s, point: %s, then PA: %s", ref, point, point_pa) return point_pa def get_pointing( obsstart, obsend, mnemonics_to_read=COARSE_MNEMONICS, service_kwargs=None, tolerance=60, reduce_func=None, ): """ Get telescope pointing engineering data. Parameters ---------- obsstart, obsend : float MJD observation start/end times mnemonics_to_read : (str,[...]) mnemonics to read. service_kwargs : dict or None Keyword arguments passed to `engdb_service` defining what engineering database service to use. tolerance : int If no telemetry can be found during the observation, the time, in seconds, beyond the observation time to search for telemetry. reduce_func : func or None Reduction function to use on values. If None, the average pointing is returned. Returns ------- pointing : Pointing or [Pointing(, ...)] The engineering pointing parameters. If the `result_type` is `all`, a list of pointings will be returned. Raises ------ ValueError Cannot retrieve engineering information. Notes ----- For the moment, the first found values will be used. This will need be re-examined when more information is available. """ if reduce_func is None: reduce_func = pointing_from_average logger.info("Determining pointing between observations times (mjd):") logger.info("obsstart: %s obsend: %s", obsstart, obsend) logger.info("Telemetry search tolerance: %s", tolerance) logger.info("Reduction function: %s", reduce_func) mnemonics = get_mnemonics( obsstart, obsend, mnemonics_to_read=mnemonics_to_read, tolerance=tolerance, service_kwargs=service_kwargs, ) reduced = reduce_func(mnemonics) logger.log(DEBUG_FULL, "Mnemonics found:") logger.log(DEBUG_FULL, "%s", mnemonics) logger.info("Reduced set of pointings:") logger.info("%s", reduced) return reduced def vector_to_angle(v): """ Return tuple of spherical angles from unit direction Vector. The Direction Cosine Matrix (DCM) that provides the transformation of a unit pointing vector defined in inertial frame (ECI J2000) coordinates to a unit vector defined in the science aperture Ideal frame coordinates is defined as. Parameters ---------- v : [v0, v1, v2] Direction vector. Returns ------- alpha, delta : float, float The spherical angles, in radians. """ alpha = np.arctan2(v[1], v[0]) delta = np.arcsin(v[2]) if alpha < 0.0: alpha += 2.0 * np.pi return alpha, delta def angle_to_vector(alpha, delta): """ Convert spherical angles to unit vector. Parameters ---------- alpha, delta : float Spherical angles in radians. Returns ------- v : [float, float, float] Unit vector. """ v0 = cos(delta) * cos(alpha) v1 = cos(delta) * sin(alpha) v2 = sin(delta) return [v0, v1, v2] def get_mnemonics( obsstart, obsend, tolerance, mnemonics_to_read=COARSE_MNEMONICS, service_kwargs=None ): """ Retrieve pointing mnemonics from the engineering database. Parameters ---------- obsstart, obsend : float astropy.Time observation start/end times. tolerance : int If no telemetry can be found during the observation, the time, in seconds, beyond the observation time to search for telemetry. mnemonics_to_read : (str[,...]) The mnemonics to fetch. service_kwargs : dict or None Keyword arguments passed to `engdb_service` defining what engineering database service to use. Returns ------- mnemonics : {mnemonic: [value[,...]][,...]} The values for each pointing mnemonic. Raises ------ ValueError Cannot retrieve engineering information. """ if service_kwargs is None: service_kwargs = dict() try: engdb = engdb_service(**service_kwargs) except EXPECTED_ERRORS as exception: raise ValueError( f"Cannot open engineering DB connection\nException: {exception}" ) from None logger.info("Querying engineering DB: %s", engdb) # Construct the mnemonic values structure. mnemonics = {mnemonic: None for mnemonic in mnemonics_to_read} # Retrieve the mnemonics from the engineering database. # Check for whether the bracket values are used and # within tolerance. for mnemonic in mnemonics: try: mnemonics[mnemonic] = engdb.get_values( mnemonic, obsstart, obsend, time_format="mjd", include_obstime=True, include_bracket_values=False, ) except EXPECTED_ERRORS as exception: logger.warning("Cannot retrieve %s from engineering.", mnemonic) logger.debug("Exception %s", exception) continue # If more than two points exist, throw off the bracket values. # Else, ensure the bracket values are within the allowed time. if len(mnemonics[mnemonic]) < 2: logger.warning( "Mnemonic %s has no telemetry within the observation time.", mnemonic ) logger.warning( "Attempting to use bracket values within %s seconds", tolerance ) mnemonics[mnemonic] = engdb.get_values( mnemonic, obsstart, obsend, time_format="mjd", include_obstime=True, include_bracket_values=True, ) tolerance_mjd = TimeDelta(tolerance, format="sec") allowed_start = obsstart - tolerance_mjd allowed_end = obsend + tolerance_mjd allowed = [ value for value in mnemonics[mnemonic] if allowed_start <= value.obstime <= allowed_end ] if not len(allowed): raise ValueError( "No telemetry exists for mnemonic {} within {} and {}".format( mnemonic, Time(allowed_start, format="mjd").isot, Time(allowed_end, format="mjd").isot, ) ) mnemonics[mnemonic] = allowed return mnemonics def all_pointings(mnemonics): """ V1 of making pointings. Parameters ---------- mnemonics : {mnemonic: [value[,...]][,...]} The values for each pointing mnemonic. Returns ------- pointings : [Pointing[,...]] List of pointings. """ ordered = mnemonics_chronologically(mnemonics) pointings = mnemonics_to_pointings(ordered) return pointings def mnemonics_to_pointings(ordered_mnemonics): """From a list of mnemonic values, create a list of Pointings Parameters ---------- ordered_mnemonics : [(Time, {mnemonic: [value[,...]][,...]})[,...]] List of 2-tuples consisting of (Time, dict) where the dict are the mnemonics values at the associated time. Returns ------- pointings : [Pointing[,...]] List of pointings """ pointings = [] for obstime, mnemonics_at_time in ordered_mnemonics: # Observatory orientation, required try: q = np.array( [mnemonics_at_time[m].value for m in COARSE_MNEMONICS_QUATERNION_ECI] ) except KeyError as exception: raise ValueError( f"One or more quaternion mnemonics not in the telemetry {COARSE_MNEMONICS_QUATERNION_ECI}" ) from exception # B-frame to FGS-frame quaternion. Not required and very oddly has so many backups... fgs_q = None try: fgs_q = np.array( [mnemonics_at_time[m].value for m in COARSE_MNEMONICS_B2FGS_EST] ) except KeyError: logger.warning( "One or more of the B-to-FGS quaternion mnemonics are not in the telementry %s", COARSE_MNEMONICS_B2FGS_EST, ) if fgs_q is None: try: fgs_q = np.array( [mnemonics_at_time[m].value for m in COARSE_MNEMONICS_B2FGS_PRELOAD] ) except KeyError: logger.warning( "One or more of the B-to-FGS quaternion mnemonics are not in the telementry %s", COARSE_MNEMONICS_B2FGS_PRELOAD, ) pointing = Pointing(fgs_q=fgs_q, obstime=obstime, q=q) pointings.append(pointing) return pointings def first_pointing(mnemonics): """ Return first pointing. Parameters ---------- mnemonics : {mnemonic: [value[,...]][,...]} The values for each pointing mnemonic. Returns ------- pointing : Pointing First pointing. """ pointings = all_pointings(mnemonics) return pointings[0] def pointing_from_average(mnemonics): """ Determine single pointing from average of available pointings. Parameters ---------- mnemonics : {mnemonic: [value[,...]][,...]} The values for each pointing mnemonic. Returns ------- pointing : Pointing Pointing from average. """ # weed-out empty mnemonics valid_mnemonics = [ mnemonic for mnemonic in mnemonics if mnemonics[mnemonic] is not None ] # Get average observation time. times = [ eng_param.obstime.unix for mnemonic in valid_mnemonics for eng_param in mnemonics[mnemonic] if eng_param.obstime.unix != 0.0 ] if len(times) > 0: obstime = Time(np.average(times), format="unix") else: raise ValueError("No valid mnemonics found.") # Get averages for all the mnemonics. mnemonic_averages = {} for mnemonic in valid_mnemonics: values = [eng_param.value for eng_param in mnemonics[mnemonic]] if np.allclose(values, 0.0): logger.warning( "Mnemonics %s is only zeros. Treating as undefined.", mnemonic ) else: mnemonic_averages[mnemonic] = EngDB_Value( obstime=obstime, value=np.average(values) ) pointing = mnemonics_to_pointings([(obstime, mnemonic_averages)])[0] # That's all folks return pointing def mnemonics_chronologically(mnemonics): """ Return time-ordered mnemonic list with progressive values. The different set of mnemonics used for observatory orientation appear at different cadences. This routine creates a time-ordered dictionary with all the mnemonics for each time found in the engineering. For mnemonics missing for a particular time, the last previous value is used. Parameters ---------- mnemonics : {mnemonic: [value[,...]]} Dictionary mapping mnemonics to their respective values. Returns ------- ordered : [(obstime, {mnemonic: value[,...]}[,...]] Time-ordered list of 2-tuple consisting of `(time, mnemonics)`. """ # Collect all information by observation time and sort. by_obstime = defaultdict(dict) for mnemonic, values in mnemonics.items(): if values is not None: for value in values: by_obstime[value.obstime][mnemonic] = value by_obstime = sorted(by_obstime.items()) # Created the ordered matrix ordered = [] last_obstime = {} for obstime, mnemonics_at_time in by_obstime: last_obstime.update(mnemonics_at_time) # Engineering data may be present, but all zeros. # Filter out this situation. values = [value.value for value in last_obstime.values()] if not any(values): continue ordered.append((obstime, copy(last_obstime))) return ordered def mnemonics_chronologically_table(mnemonics): """ Return time-ordered mnemonic list with progressive values. The different set of mnemonics used for observatory orientation appear at different cadences. This routine creates a time-ordered dictionary with all the mnemonics for each time found in the engineering. For mnemonics missing for a particular time, the last previous value is used. Parameters ---------- mnemonics : {mnemonic: [value[,...]]} Dictionary mapping mnemonics to their respective values. Returns ------- ordered_by_time : `astropy.table.Table` Time-ordered mnemonic list with progressive values. """ ordered = mnemonics_chronologically(mnemonics) names = list(mnemonics.keys()) names = ["time", *names] time_idx = 0 values = [[] for _ in names] for time, mnemonics_at_time in ordered: values[time_idx].append(time) for mnemonic in mnemonics_at_time: idx = names.index(mnemonic) values[idx].append(ordered[time][mnemonic].value) t = Table(values, names=names) return t def t_pars_from_model(model, t_pars): """ Initialize TransformParameters from a DataModel. Parameters ---------- model : DataModel Data model to initialize from. t_par : TransformParameters Transformation parameters updated with model information. Updating is performed in-place. """ # Instrument details t_pars.aperture = model.meta.wcsinfo.aperture_name try: exp_type = model.meta.exposure.type.lower() except AttributeError: exp_type = None t_pars.exp_type = exp_type # observation parameters t_pars.obsstart = model.meta.exposure.start_time t_pars.obsend = model.meta.exposure.end_time ephem = model.meta.ephemeris t_pars.velocity = (ephem.velocity_x, ephem.velocity_y, ephem.velocity_z) def dcm(alpha, delta, angle): """ Construct the Direction Cosine Matrix (DCM). Typical usage is passing of (RA, DEC, PositionAngle). All values must be in radians. Parameters ---------- alpha : float First coordinate in radians. delta : float Second coordinate in radians. angle : float Position angle in radians. Returns ------- dcm : np.array((3, 3)) The 3x3 direction cosine matrix. """ dcm = np.array( [ [cos(delta) * cos(alpha), cos(delta) * sin(alpha), sin(delta)], [ -cos(angle) * sin(alpha) + sin(angle) * sin(delta) * cos(alpha), cos(angle) * cos(alpha) + sin(angle) * sin(delta) * sin(alpha), -sin(angle) * cos(delta), ], [ -sin(angle) * sin(alpha) - cos(angle) * sin(delta) * cos(alpha), sin(angle) * cos(alpha) - cos(angle) * sin(delta) * sin(alpha), cos(angle) * cos(delta), ], ] ) return dcm def update_meta(model, wcsinfo, vinfo, quality): """Update model's meta info with the given pointing. The following meta are update: - meta.pointing.dec_v1 - meta.pointing.pa_aperture - meta.pointing.pa_v3 - meta.pointing.ra_v1 - meta.pointing.target_dec - meta.pointing.target_ra - meta.wcsinfo.dec_ref - meta.wcsinfo.ra_ref - meta.wcsinfo.roll_ref - meta.wcsinfo.s_region Parameters ---------- model : `~roman.datamodels.DataModel` The model to update. Updates are done in-place. wcsinfo : `WCSRef`` The aperture-specific pointing. vinfo : ``WCSRef` The V1-specific pointing quality : str Indicator of the success of the pointing determination. """ # Shortcuts to the meta blocks wm = model.meta.wcsinfo pm = model.meta.pointing # Setup SIAF info. from pysiaf import Siaf siaf = Siaf("roman") aper = siaf[wm.aperture_name.upper()] # Set the quality model.meta.pointing.pointing_engineering_source = quality # Update SIAF-related meta wm.v2_ref = aper.V2Ref wm.v3_ref = aper.V3Ref wm.v3yangle = aper.V3IdlYAngle wm.vparity = aper.VIdlParity # Update aperture-centric pointing, represented by `wcsinfo` wm.dec_ref = wcsinfo.dec wm.ra_ref = wcsinfo.ra wm.roll_ref = wcsinfo.pa wm.s_region = wcsinfo.s_region # Update general pointing information, including V1. pm.dec_v1 = vinfo.dec pm.pa_aperture = wcsinfo.pa pm.pa_v3 = vinfo.pa pm.ra_v1 = vinfo.ra # Update target's sky location. attitude = attitude_from_v1(vinfo) targ_app = siaf[model.meta.pointing.target_aperture] targ_app.set_attitude_matrix(attitude) skycoord = targ_app.reference_point(to_frame="sky") pm.target_ra = skycoord[0] pm.target_dec = skycoord[1] # If not present, stub-out keywords that are expected # in L1 products. 26Q2B21 will deal with this bad situation gs = model.meta.guide_star gs.corrected_ra = getattr(gs, "corrected_ra", pm.target_ra) gs.corrected_dec = getattr(gs, "corrected_dec", pm.target_dec) gs.h = getattr(gs, "h", wm.v2_ref) gs.v = getattr(gs, "v", wm.v3_ref) def attitude_from_v1(vinfo): """Calculate observatory attitude matrix based on V1 pointing Return the attitude matrix used by `pysiaf.Aperture.set_attitude_matrix` to enable transformations to/from the 'sky' frame. Parameters ---------- vinfo : `WCSRef` The WCS information for V1, the boresight. Returns ------- attitude : np.array((3, 3), dtype=float) The attitude matrix. """ from pysiaf import Siaf from pysiaf.utils.rotations import attitude_matrix siaf = Siaf("roman") boresight = siaf["BORESIGHT"] v_refpoint = boresight.reference_point(to_frame="tel") attitude = attitude_matrix(*v_refpoint, vinfo.ra, vinfo.dec, vinfo.pa) return attitude def calc_m_fgs2gs(x, y): """Calculate DCM that converts FGS reference to guide star location Parameters ---------- x, y : float, float Guidestar location (radians) """ m = np.array( [ [cos(x), 0.0, sin(x)], [-sin(x) * sin(y), cos(y), cos(x) * sin(y)], [-sin(x) * cos(y), -sin(y), cos(x) * cos(y)], ] ) return m def calc_gsapp2gs(m_eci2gsapp, velocity): """ Calculate the Velocity Aberration correction. This implements Eq. 40 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11 From Section 3.2.5: The velocity aberration correction is applied in the direction of the guide star. The matrix that translates from ECI to the apparent guide star ICS frame is M_(ECI→GSAppICS), where the GS Apparent position vector is along the z-axis in the guide star ICS frame. Note that the convention for Roman is in the opposite direction. The algorithm is the same, just different input/output. Parameters ---------- m_eci2gsapp : numpy.array(3, 3) The the ECI to Guide Star transformation matrix, in the ICS frame. velocity : numpy.array([dx, dy, dz]) The barycentric velocity. Returns ------- m_gsapp2gs : numpy.array(3, 3) The velocity aberration correction matrix. """ # Check velocity. If present, negate the velocity since # the desire is to remove the correction. velocity = np.array(velocity) if not velocity.all() or np.sum(np.abs(velocity) > MAX_OBSERVATORY_SPEED): logger.warning( "Velocity: %s is either unspecified or contains unreasonable values. Cannot calculate aberration. Returning identity matrix", velocity, ) return np.identity(3) velocity = -1 * velocity # Eq. 35: Guide star position vector uz = np.array([0.0, 0.0, 1.0]) u_gseci = np.dot(np.transpose(m_eci2gsapp), uz) # Eq. 36: Compute the apparent shift due to velocity aberration. try: scale_factor, u_gseci_app = compute_va_effects_vector(*velocity, u_gseci) except TypeError: logger.warning( "Failure in computing velocity aberration. Returning identity matrix." ) logger.warning("Exception: %s", sys.exc_info()) return np.identity(3) # Eq. 39: Rotate from ICS into the guide star frame. u_gs_app = np.dot(m_eci2gsapp, u_gseci_app) # Eq. 40: Compute the M_gsapp2gs matrix u_prod = np.cross(uz, u_gs_app) u_prod_mag = np.linalg.norm(u_prod) a_hat = u_prod / u_prod_mag m_a_hat = np.array( [ [0.0, -a_hat[2], a_hat[1]], [a_hat[2], 0.0, -a_hat[0]], [-a_hat[1], a_hat[0], 0.0], ] ) theta = np.arcsin(u_prod_mag) m_gsapp2gs = ( np.identity(3) - (m_a_hat * np.sin(theta)) + (2 * m_a_hat**2 * np.sin(theta / 2.0) ** 2) ) logger.debug("m_gsapp2gs: %s", m_gsapp2gs) return m_gsapp2gs def hv_to_fgs(aperture_name, h, v): """Convert HV frame to FGS frame Source document: Innerspace Confluence: "Quaternion Transforms for Coarse Pointing WCS" from the section "Derive M_fgstogsapp". The coordinates systems are based on Eq. 7 of "Roman-STScI-000416 (Roman SOC FGS Algorithms)" Parameters ---------- aperture_name : str The aperture in which the guide star is located. h, v : float, float The commanded coordinates in the HV frame. Units are in pixels. Returns ------- fgs_x, fgs_y : float, float The coordinates in the FGS reference frame in arcsec. """ from pysiaf import Siaf siaf = Siaf("roman") aper = siaf[aperture_name] aper_wfi_cen = siaf["WFI_CEN"] # Location of GS. This is from Eqn. 7 of Roman-STScI-000416 (Roman SOC FGS Algorithms) x, y = h + aper.XSciRef, aper.YSciRef - v v2, v3 = aper.sci_to_tel(x, y) # Convert sci to tel coords using pysiaf # Convert tel to WFI_CEN idl coords using pysiaf. # The only difference between FGS and WFI_CEN coordinates is that the Y-axis is flipped x_wc, y_wc = aper_wfi_cen.tel_to_idl(v2, v3) x_fgs, y_fgs = x_wc, -y_wc return x_fgs, y_fgs def calc_m_b2fgs(fgs_q=None): """Calculate the B-to-FGS frame DCM Parameters ---------- fgs_q : [float, float, float, float] or None The quaterion representing the B to FGS transformation. If no B-to-FGS quaternion is given, use a pre-launch defined matrix. """ if fgs_q is None: logger.warning("No B-to-FGS information is given. Using pre-launch values.") return M_B2FCS0 # Calculate the DCM from the quaterion return calc_quat2matrix(fgs_q) def calc_m_fgs2gsapp(x, y): """Calculate the FGS to Guide star apparent transformation Source document: Innerspace Confluence: "Quaternion Transforms for Coarse Pointing WCS" from the section "Derive M_fgstogsapp". Parameters ---------- x, y : float, float Position in arcseconds in the FGS frame Returns ------- m_fgs2gsapp : np.array(3,3) The DCM representing the transformation. """ x_gs, y_gs = A2R * x, A2R * y cx, sx = cos(x_gs), sin(x_gs) cy, sy = cos(y_gs), sin(y_gs) m_x = np.array([[cx, 0, -sx], [0, 1, 0], [sx, 0, cx]]) m_y = np.array([[1, 0, 0], [0, cy, sy], [0, -sy, cy]]) m_gsapp2fgs = np.dot(m_y, m_x) m_fgs2gsapp = m_gsapp2fgs.T return m_fgs2gsapp