"""Set Telescope Pointing from Observatory Engineering Telemetry.
Calculate and update the pointing-related and world coordinate system-related
keywords. Given a time period, usually defined by an exposure, the engineering
mnemonic database is queried for observatory orientation. The orientation
defines the sky coordinates that a particular point on the observatory is
pointed to. Then, using a set of matrix transformations, the sky coordinates of
the reference pixel of a desired aperture is calculated.
The transformations are defined by the STScI Innerspace (non-public) document
titles "Quaternion Transforms for Coarse Pointing WCS". The code itself follows
a demonstrative jupyter notebook.
**Interface**
The primary usage is through the command line interface
``set_telescope_pointing``. Operating on a list of Roman Level 1 exposures,
this command updates the world coordinate system keywords with the values
necessary to translate from aperture pixel to sky coordinates.
Access to the Roman Engineering Mnemonic database is required. See the
:ref:`Engineering Database Interface <engdb>` for more information.
Programmatically, the command line is implemented by the function
`~roman.orientation.set_telescope_pointing.add_wcs`, which calls the basic function
`~roman.orientation.set_telescope_pointing.calc_wcs`.
There are two data structures used to maintain the state of the transformation.
`~roman.orientation.set_telescope_pointing.TransformParameters` contains the parameters
needed to perform the transformations.
`~roman.orientation.set_telescope_pointing.Transforms` contains the calculated
transformation matrices.
**Transformation Matrices**
All the transformation matrices, as defined by
`~roman.orientation.set_telescope_pointing.Transforms`, are Direction Cosine Matrices
(DCM). A DCM contains the Euler rotation angles that represent the sky
coordinates for a particular frame-of-reference. The initial DCM is provided
through the engineering telemetry and represents how the observatory is orientated.
**META Affected**
The following meta values are populated:
- meta.pointing.dec_v1
- meta.pointing.pa_aperture
- meta.pointing.pa_v3
- meta.pointing.ra_v1
- meta.pointing.target_dec
- meta.pointing.target_ra
- meta.wcsinfo.dec_ref
- meta.wcsinfo.ra_ref
- meta.wcsinfo.roll_ref
- meta.wcsinfo.s_region
"""
import dataclasses
import logging
import sys
from collections import defaultdict, namedtuple
from collections.abc import Callable
from copy import copy
from math import cos, sin
from typing import Any
import asdf
import numpy as np
import roman_datamodels as rdm
from astropy.table import Table
from astropy.time import Time, TimeDelta
from stcal.alignment.util import compute_s_region_keyword
from stcal.velocity_aberration import compute_va_effects_vector
from ..lib.engdb.engdb_lib import EngDB_Value
from ..lib.engdb.engdb_tools import engdb_service
__all__ = [
"TransformParameters",
"Transforms",
"WCSRef",
"add_wcs",
"calc_transforms",
"calc_wcs",
"calc_wcs_over_time",
"update_wcs",
]
# Setup logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
DEBUG_FULL = logging.DEBUG - 1
LOGLEVELS = [logging.INFO, logging.DEBUG, DEBUG_FULL]
# Datamodels that can be updated, normally
EXPECTED_MODELS = rdm.datamodels.ScienceRawModel
# Exposure types that can be updated, normally
TYPES_TO_UPDATE = set()
# Mnemonics needed.
COARSE_MNEMONICS_QUATERNION_ECI = [f"SCF_AC_SDR_QBJ_{idx + 1}" for idx in range(4)]
COARSE_MNEMONICS_B2FGS_EST = [f"SCF_AC_EST_FGS_qbr{idx + 1}" for idx in range(4)]
COARSE_MNEMONICS_B2FGS_PRELOAD = [f"SCF_AC_FGS_TBL_Qb{idx + 1}" for idx in range(4)]
COARSE_MNEMONICS = (
COARSE_MNEMONICS_QUATERNION_ECI
+ COARSE_MNEMONICS_B2FGS_EST
+ COARSE_MNEMONICS_B2FGS_PRELOAD
)
# Default and pre-defined matricies.
# Conversion of the FCS reference point from the V-Frame.
# This is the pre-launch value, later to be refined and provided
# in the SIAF
M_V2FCS0 = np.array(
[
[-0.0000001, 0.5000141, 0.8660173],
[0.0086567, -0.8659848, 0.4999953],
[0.9999625, 0.0074969, -0.0043284],
],
)
# Default B-frame to FCS frame, M_b_to_fcs
# Pre-launch this is the same as M_v_to_fcs.
M_B2FCS0 = M_V2FCS0
# Maximum absolute speed of the observatory. Used for sanity check is defined
# as the sum of the absolute components of the velocity.
MAX_OBSERVATORY_SPEED = 150
# Define the transformation matrices to move between the Idealized Coordinate System (ICS)
# and the Idealized Coordinate System (Idl). ICS is the spacecraft-centric system used by
# all frames up through the V-frame. Idl is used by the instruments.
# Reference: Innerspace document
M_idl2ics = MX2Z = np.array([[0, 1, 0], [0, 0, 1], [1, 0, 0]])
M_ics2idl = MZ2X = np.array([[0, 0, 1], [1, 0, 0], [0, 1, 0]])
# Common conversions.
# Yes, these are available in a number of packages. However, they are explicitly defined
# to provide consistency and, if any were to be changed, an explicit history of that change.
# Conversion from seconds to MJD
SECONDS2MJD = 1 / 24 / 60 / 60
# Degree, radian, angle transformations
R2D = 180.0 / np.pi
D2R = np.pi / 180.0
A2R = D2R / 3600.0
R2A = 3600.0 * R2D
PI2 = np.pi * 2.0
# Potential errors generated by mnemonic retrival
EXPECTED_ERRORS = (OSError, RuntimeError, ValueError)
# Pointing container
# Attributes are as follows. Except for the observation time, all values
# are retrieved from the engineering data.
# fgs_q : Quaternion representing orientation of the FGS frame relative to the Observatory frame
# obstime : Time the pointing information refers to.
# q : Quaternion of the FGS.
Pointing = namedtuple("Pointing", ["fgs_q", "obstime", "q"])
Pointing.__new__.__defaults__ = (None,) * 3
# Transforms
# WCS reference container
WCSRef = namedtuple("WCSRef", ["ra", "dec", "pa", "s_region"])
WCSRef.__new__.__defaults__ = (None, None, None, "")
[docs]
def add_wcs(
filename,
dry_run=False,
save_transforms=None,
**transform_kwargs,
):
"""Add WCS information to a Roman DataModel.
Telescope orientation is attempted to be obtained from
the engineering database. Failing that, a default pointing
is used based on input default information.
The file is updated in-place.
Parameters
----------
filename : Path-like
The path to a data file.
dry_run : bool
Run through the calculations but do not modify the file.
save_transforms : Path-like or None
File to save the calculated transforms to.
transform_kwargs : dict
dict to use to initialize the `TransformParameters` object.
See `TransformParameters` for more information.`
Notes
-----
This function adds absolute pointing information to the Roman datamodels
provided. By default, only Level 1 exposures are allowed to be updated.
These have the suffixes of "uncal" representing datamodel ScienceRawModel.
Any higher level product, from Level 2 and beyond, that has had the
`assign_wcs` step applied, have improved WCS information. Running this task
on such files will potentially corrupt the WCS.
"""
logger.info("Updating WCS info for file %s", filename)
with rdm.open(filename) as model:
if not isinstance(model, EXPECTED_MODELS):
logger.warning("Input %s is not of an expected type (uncal)", model)
logger.warning(
" Updating pointing may have no effect or detrimental effects on the "
"WCS information,"
)
logger.warning(
" especially if the input is the result of Level2b or higher calibration."
)
raise TypeError(
f"Input model {model} is not one of {EXPECTED_MODELS}."
"\n\tFailing WCS processing."
)
t_pars, transforms = update_wcs(
model,
**transform_kwargs,
)
if dry_run:
logger.info("Dry run requested; results are not saved.")
else:
logger.info("Saving updated model %s", filename)
model.save(filename)
if transforms and save_transforms:
logger.info("Saving transform matrices to %s", save_transforms)
transforms.write_to_asdf(save_transforms)
logger.info("...update completed")
[docs]
def update_wcs(
model,
**transform_kwargs,
):
"""
Update WCS pointing information.
Given a `roman.datamodels.DataModel`, determine the simple WCS parameters
from the SIAF keywords in the model and the engineering parameters
that contain information about the telescope orientation.
It presumes all the accessed keywords are present.
Parameters
----------
model : `~roman.datamodels.DataModel`
The model to update.
**transform_kwargs : dict
dict to use to initialize the `TransformParameters` object.
See `TransformParameters` for more information.`
Returns
-------
t_pars, transforms : TransformParameters, Transforms
The parameters and transforms calculated. May be
None for either if telemetry calculations were not
performed.
"""
# Configure transformation parameters.
t_pars = TransformParameters(**transform_kwargs)
t_pars_from_model(model, t_pars)
# Calculate WCS.
transforms = update_wcs_from_telem(model, t_pars)
return t_pars, transforms
def update_wcs_from_telem(model, t_pars: TransformParameters):
"""
Update WCS pointing information.
Given a `roman.datamodels.DataModel`, determine the simple WCS parameters
from the SIAF keywords in the model and the engineering parameters
that contain information about the telescope pointing.
It presumes all the accessed keywords are present.
Parameters
----------
model : `~roman.datamodels.DataModel`
The model to update. The update is done in-place.
t_pars : `TransformParameters`
The transformation parameters. Parameters are updated during processing.
Returns
-------
transforms : Transforms or None
If available, the transformation matrices.
"""
logger.info("Updating wcs from telemetry.")
# Initialization. If provided, provide a default Pointing.
transforms = None # Assume no transforms are calculated.
quality = None # Unknown pointing quality.
# Get the pointing information
try:
t_pars.update_from_engdb()
except ValueError as exception:
logger.error("Cannot retrieve valid engineering orientation data")
if t_pars.default_quaternion is None or not t_pars.allow_default:
logger.error("Use of default orientation has been disabled. Aborting.")
raise
else:
logger.warning("Exception is %s", exception)
obstime = Time(
(t_pars.obsstart.mjd + t_pars.obsend.mjd) / 2.0, format="mjd"
)
logger.warning(
"Using provided default quaternion: %s", t_pars.default_quaternion
)
logger.warning(" at time %s", obstime.iso)
logger.info("Setting pointing quality to PLANNED")
t_pars.pointing = Pointing(q=t_pars.default_quaternion, obstime=obstime)
quality = "PLANNED"
else:
logger.info("Successful read of engineering quaternions:")
logger.info("\tPointing: %s", t_pars.pointing)
quality = "CALCULATED"
# Attempt to calculate WCS information
try:
wcsinfo, vinfo, transforms = calc_wcs(t_pars)
logger.info("Setting pointing quality to %s", quality)
except EXPECTED_ERRORS:
logger.error("WCS calculation has failed")
raise
# Update model meta.
logger.info("Aperture WCS info: %s", wcsinfo)
logger.info("V1 WCS info: %s", vinfo)
update_meta(model, wcsinfo, vinfo, quality)
return transforms
[docs]
def calc_wcs_over_time(obsstart, obsend, t_pars: TransformParameters):
"""
Calculate V1 and aperture WCS over a time period.
Parameters
----------
obsstart, obsend : float
MJD observation start/end times
t_pars : `TransformParameters`
The transformation parameters. Parameters are updated during processing.
Returns
-------
obstimes, wcsinfos, vinfos : [astropy.time.Time[,...]], [WCSRef[,...]], [WCSRef[,...]]
A 3-tuple is returned with the WCS pointings for
the aperture and the V1 axis.
"""
# Setup structures
obstimes = []
wcsinfos = []
vinfos = []
# Calculate WCS
try:
pointings = get_pointing(
obsstart,
obsend,
service_kwargs=t_pars.service_kwargs,
tolerance=t_pars.tolerance,
reduce_func=t_pars.reduce_func,
)
except ValueError:
logger.warning(
"Cannot get valid engineering mnemonics from engineering database"
)
raise
if not isinstance(pointings, list):
pointings = [pointings]
for pointing in pointings:
t_pars.pointing = pointing
wcsinfo, vinfo, transforms = calc_wcs(t_pars)
obstimes.append(pointing.obstime)
wcsinfos.append(wcsinfo)
vinfos.append(vinfo)
return obstimes, wcsinfos, vinfos
[docs]
def calc_wcs(t_pars: TransformParameters):
"""
Calculate WCS.
Given observatory orientation and target aperture,
calculate V1 and Reference Pixel sky coordinates.
Parameters
----------
t_pars : `TransformParameters`
The transformation parameters. Parameters are updated during processing.
Returns
-------
wcsinfo, vinfo, transforms : WCSRef, WCSRef, Transforms
A 3-tuple is returned with the WCS pointing for
the aperture and the V1 axis, and the transformation matrices.
"""
# Calculate transforms
transforms = calc_transforms(t_pars)
# Calculate the V1 WCS information
vinfo = calc_wcs_from_matrix(transforms.m_eci2v.T)
# Calculate the Aperture WCS
wcsinfo = wcsinfo_from_siaf(t_pars.aperture, vinfo)
# That's all folks
return wcsinfo, vinfo, transforms
def wcsinfo_from_siaf(aperture, vinfo):
"""Calculate aperture reference point WCS from V-frame WCS and SIAF
Parameters
----------
aperture : str
The aperture in use
vinfo : WCSRef
The V-frame WCS
Returns
-------
wcsinfo : WCSRef
The WCS for the aperture's reference point, as defined by its SIAF.
"""
from pysiaf import Siaf
from pysiaf.utils.rotations import sky_posangle
siaf = Siaf("roman")
wfi = siaf[aperture.upper()]
# For transformations between the telescope frame and all other frames,
# an attitude matrix is created using the V-frame WCS information.
attitude = attitude_from_v1(vinfo)
wfi.set_attitude_matrix(attitude)
skycoord = wfi.reference_point(to_frame="sky")
pa_v3 = sky_posangle(attitude, *skycoord)
# Compute S_REGION keyword
corners = wfi.corners("sky")
footprint = np.array([corners[0], corners[1]]).T
s_region_keyword = compute_s_region_keyword(footprint)
wcsinfo = WCSRef(
ra=skycoord[0], dec=skycoord[1], pa=pa_v3, s_region=s_region_keyword
)
return wcsinfo
def calc_attitude_matrix(wcs, yangle, position):
"""
Calculate the DCM attitude from known positions and roll angles.
Parameters
----------
wcs : WCSRef
The sky position.
yangle : float
The IdlYangle of the point in question.
position : numpy.array(2)
The position in Ideal frame.
Returns
-------
m : np.array(3,3)
The transformation matrix
"""
# Convert to radians
ra = wcs.ra * D2R
dec = wcs.dec * D2R
yangle_ra = yangle * D2R
pos_rads = position * A2R
v2 = pos_rads[0]
v3 = pos_rads[1]
# Create the matrices
r1 = dcm(ra, dec, yangle_ra)
r2 = np.array(
[
[cos(v2) * cos(v3), -sin(v2), -cos(v2) * sin(v3)],
[sin(v2) * cos(v3), cos(v2), -sin(v2) * sin(v3)],
[sin(v3), 0.0, cos(v3)],
]
)
# Final transformation
m = np.dot(r2, r1)
logger.debug("attitude DCM: %s", m)
return m
def calc_wcs_from_matrix(m):
"""
Calculate the WCS information from a DCM.
Parameters
----------
m : np.array((3, 3))
The DCM matrix to extract WCS information from.
Returns
-------
wcs : WCSRef
The WCS.
"""
# V1 RA/Dec is the first row of the transform
v1_ra, v1_dec = vector_to_angle(m[0])
wcs = WCSRef(v1_ra, v1_dec, None)
# V3 is the third row of the transformation
v3_ra, v3_dec = vector_to_angle(m[2])
v3wcs = WCSRef(v3_ra, v3_dec, None)
# Calculate the V3 position angle
v1_pa = calc_position_angle(wcs, v3wcs)
# Convert to degrees
wcs = WCSRef(ra=wcs.ra * R2D, dec=wcs.dec * R2D, pa=v1_pa * R2D)
logger.debug("wcs: %s", wcs)
return wcs
def calc_quat2matrix(q):
"""
Create a Direction Cosine Matrix (DCM) from a quaternion.
Parameters
----------
q : np.array(q1, q2, q3, q4)
Array of quaternions from the engineering database.
Returns
-------
transform : np.array((3, 3))
The transform matrix representing the transformation
from observatory orientation to J-Frame.
"""
q1, q2, q3, q4 = q
transform = np.array(
[
[
1.0 - 2.0 * q2 * q2 - 2.0 * q3 * q3,
2.0 * (q1 * q2 + q3 * q4),
2.0 * (q3 * q1 - q2 * q4),
],
[
2.0 * (q1 * q2 - q3 * q4),
1.0 - 2.0 * q3 * q3 - 2.0 * q1 * q1,
2.0 * (q2 * q3 + q1 * q4),
],
[
2.0 * (q3 * q1 + q2 * q4),
2.0 * (q2 * q3 - q1 * q4),
1.0 - 2.0 * q1 * q1 - 2.0 * q2 * q2,
],
],
dtype=float,
)
logger.debug("quaternion: %s", transform)
return transform
def calc_position_angle(point, ref):
"""
Calculate position angle from reference to point.
tan(pa) = cos(dec_r) * sin(ra_r - ra_p) / (sin(dec_r)cos(dec_p) - cos(dec_r)sin(dec_p)cos(ra_r-ra_p))
where::
pa : position angle
*_r : reference
*_p : point
Typically the reference is the V3 RA/DEC and point is the object RA/DEC.
Parameters
----------
point : WCSRef
The POINT wcs parameters, in radians.
ref : WCSRef
The TARGET wcs parameters, in radians.
Returns
-------
point_pa : float
The POINT position angle, in radians
"""
y = cos(ref.dec) * sin(ref.ra - point.ra)
x = sin(ref.dec) * cos(point.dec) - cos(ref.dec) * sin(point.dec) * cos(
ref.ra - point.ra
)
point_pa = np.arctan2(y, x)
if point_pa < 0:
point_pa += PI2
if point_pa >= PI2:
point_pa -= PI2
logger.debug("Given reference: %s, point: %s, then PA: %s", ref, point, point_pa)
return point_pa
def get_pointing(
obsstart,
obsend,
mnemonics_to_read=COARSE_MNEMONICS,
service_kwargs=None,
tolerance=60,
reduce_func=None,
):
"""
Get telescope pointing engineering data.
Parameters
----------
obsstart, obsend : float
MJD observation start/end times
mnemonics_to_read : (str,[...])
mnemonics to read.
service_kwargs : dict or None
Keyword arguments passed to `engdb_service` defining what
engineering database service to use.
tolerance : int
If no telemetry can be found during the observation,
the time, in seconds, beyond the observation time to
search for telemetry.
reduce_func : func or None
Reduction function to use on values.
If None, the average pointing is returned.
Returns
-------
pointing : Pointing or [Pointing(, ...)]
The engineering pointing parameters.
If the `result_type` is `all`, a list
of pointings will be returned.
Raises
------
ValueError
Cannot retrieve engineering information.
Notes
-----
For the moment, the first found values will be used.
This will need be re-examined when more information is
available.
"""
if reduce_func is None:
reduce_func = pointing_from_average
logger.info("Determining pointing between observations times (mjd):")
logger.info("obsstart: %s obsend: %s", obsstart, obsend)
logger.info("Telemetry search tolerance: %s", tolerance)
logger.info("Reduction function: %s", reduce_func)
mnemonics = get_mnemonics(
obsstart,
obsend,
mnemonics_to_read=mnemonics_to_read,
tolerance=tolerance,
service_kwargs=service_kwargs,
)
reduced = reduce_func(mnemonics)
logger.log(DEBUG_FULL, "Mnemonics found:")
logger.log(DEBUG_FULL, "%s", mnemonics)
logger.info("Reduced set of pointings:")
logger.info("%s", reduced)
return reduced
def vector_to_angle(v):
"""
Return tuple of spherical angles from unit direction Vector.
The Direction Cosine Matrix (DCM) that provides the transformation of a
unit pointing vector defined in inertial frame (ECI J2000) coordinates to a
unit vector defined in the science aperture Ideal frame coordinates is
defined as.
Parameters
----------
v : [v0, v1, v2]
Direction vector.
Returns
-------
alpha, delta : float, float
The spherical angles, in radians.
"""
alpha = np.arctan2(v[1], v[0])
delta = np.arcsin(v[2])
if alpha < 0.0:
alpha += 2.0 * np.pi
return alpha, delta
def angle_to_vector(alpha, delta):
"""
Convert spherical angles to unit vector.
Parameters
----------
alpha, delta : float
Spherical angles in radians.
Returns
-------
v : [float, float, float]
Unit vector.
"""
v0 = cos(delta) * cos(alpha)
v1 = cos(delta) * sin(alpha)
v2 = sin(delta)
return [v0, v1, v2]
def get_mnemonics(
obsstart, obsend, tolerance, mnemonics_to_read=COARSE_MNEMONICS, service_kwargs=None
):
"""
Retrieve pointing mnemonics from the engineering database.
Parameters
----------
obsstart, obsend : float
astropy.Time observation start/end times.
tolerance : int
If no telemetry can be found during the observation,
the time, in seconds, beyond the observation time to
search for telemetry.
mnemonics_to_read : (str[,...])
The mnemonics to fetch.
service_kwargs : dict or None
Keyword arguments passed to `engdb_service` defining what
engineering database service to use.
Returns
-------
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic.
Raises
------
ValueError
Cannot retrieve engineering information.
"""
if service_kwargs is None:
service_kwargs = dict()
try:
engdb = engdb_service(**service_kwargs)
except EXPECTED_ERRORS as exception:
raise ValueError(
f"Cannot open engineering DB connection\nException: {exception}"
) from None
logger.info("Querying engineering DB: %s", engdb)
# Construct the mnemonic values structure.
mnemonics = {mnemonic: None for mnemonic in mnemonics_to_read}
# Retrieve the mnemonics from the engineering database.
# Check for whether the bracket values are used and
# within tolerance.
for mnemonic in mnemonics:
try:
mnemonics[mnemonic] = engdb.get_values(
mnemonic,
obsstart,
obsend,
time_format="mjd",
include_obstime=True,
include_bracket_values=False,
)
except EXPECTED_ERRORS as exception:
logger.warning("Cannot retrieve %s from engineering.", mnemonic)
logger.debug("Exception %s", exception)
continue
# If more than two points exist, throw off the bracket values.
# Else, ensure the bracket values are within the allowed time.
if len(mnemonics[mnemonic]) < 2:
logger.warning(
"Mnemonic %s has no telemetry within the observation time.", mnemonic
)
logger.warning(
"Attempting to use bracket values within %s seconds", tolerance
)
mnemonics[mnemonic] = engdb.get_values(
mnemonic,
obsstart,
obsend,
time_format="mjd",
include_obstime=True,
include_bracket_values=True,
)
tolerance_mjd = TimeDelta(tolerance, format="sec")
allowed_start = obsstart - tolerance_mjd
allowed_end = obsend + tolerance_mjd
allowed = [
value
for value in mnemonics[mnemonic]
if allowed_start <= value.obstime <= allowed_end
]
if not len(allowed):
raise ValueError(
"No telemetry exists for mnemonic {} within {} and {}".format(
mnemonic,
Time(allowed_start, format="mjd").isot,
Time(allowed_end, format="mjd").isot,
)
)
mnemonics[mnemonic] = allowed
return mnemonics
def all_pointings(mnemonics):
"""
V1 of making pointings.
Parameters
----------
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic.
Returns
-------
pointings : [Pointing[,...]]
List of pointings.
"""
ordered = mnemonics_chronologically(mnemonics)
pointings = mnemonics_to_pointings(ordered)
return pointings
def mnemonics_to_pointings(ordered_mnemonics):
"""From a list of mnemonic values, create a list of Pointings
Parameters
----------
ordered_mnemonics : [(Time, {mnemonic: [value[,...]][,...]})[,...]]
List of 2-tuples consisting of (Time, dict) where the dict are
the mnemonics values at the associated time.
Returns
-------
pointings : [Pointing[,...]]
List of pointings
"""
pointings = []
for obstime, mnemonics_at_time in ordered_mnemonics:
# Observatory orientation, required
try:
q = np.array(
[mnemonics_at_time[m].value for m in COARSE_MNEMONICS_QUATERNION_ECI]
)
except KeyError as exception:
raise ValueError(
f"One or more quaternion mnemonics not in the telemetry {COARSE_MNEMONICS_QUATERNION_ECI}"
) from exception
# B-frame to FGS-frame quaternion. Not required and very oddly has so many backups...
fgs_q = None
try:
fgs_q = np.array(
[mnemonics_at_time[m].value for m in COARSE_MNEMONICS_B2FGS_EST]
)
except KeyError:
logger.warning(
"One or more of the B-to-FGS quaternion mnemonics are not in the telementry %s",
COARSE_MNEMONICS_B2FGS_EST,
)
if fgs_q is None:
try:
fgs_q = np.array(
[mnemonics_at_time[m].value for m in COARSE_MNEMONICS_B2FGS_PRELOAD]
)
except KeyError:
logger.warning(
"One or more of the B-to-FGS quaternion mnemonics are not in the telementry %s",
COARSE_MNEMONICS_B2FGS_PRELOAD,
)
pointing = Pointing(fgs_q=fgs_q, obstime=obstime, q=q)
pointings.append(pointing)
return pointings
def first_pointing(mnemonics):
"""
Return first pointing.
Parameters
----------
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic.
Returns
-------
pointing : Pointing
First pointing.
"""
pointings = all_pointings(mnemonics)
return pointings[0]
def pointing_from_average(mnemonics):
"""
Determine single pointing from average of available pointings.
Parameters
----------
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic.
Returns
-------
pointing : Pointing
Pointing from average.
"""
# weed-out empty mnemonics
valid_mnemonics = [
mnemonic for mnemonic in mnemonics if mnemonics[mnemonic] is not None
]
# Get average observation time.
times = [
eng_param.obstime.unix
for mnemonic in valid_mnemonics
for eng_param in mnemonics[mnemonic]
if eng_param.obstime.unix != 0.0
]
if len(times) > 0:
obstime = Time(np.average(times), format="unix")
else:
raise ValueError("No valid mnemonics found.")
# Get averages for all the mnemonics.
mnemonic_averages = {}
for mnemonic in valid_mnemonics:
values = [eng_param.value for eng_param in mnemonics[mnemonic]]
if np.allclose(values, 0.0):
logger.warning(
"Mnemonics %s is only zeros. Treating as undefined.", mnemonic
)
else:
mnemonic_averages[mnemonic] = EngDB_Value(
obstime=obstime, value=np.average(values)
)
pointing = mnemonics_to_pointings([(obstime, mnemonic_averages)])[0]
# That's all folks
return pointing
def mnemonics_chronologically(mnemonics):
"""
Return time-ordered mnemonic list with progressive values.
The different set of mnemonics used for observatory orientation
appear at different cadences. This routine creates a time-ordered dictionary
with all the mnemonics for each time found in the engineering. For mnemonics
missing for a particular time, the last previous value is used.
Parameters
----------
mnemonics : {mnemonic: [value[,...]]}
Dictionary mapping mnemonics to their respective values.
Returns
-------
ordered : [(obstime, {mnemonic: value[,...]}[,...]]
Time-ordered list of 2-tuple consisting of `(time, mnemonics)`.
"""
# Collect all information by observation time and sort.
by_obstime = defaultdict(dict)
for mnemonic, values in mnemonics.items():
if values is not None:
for value in values:
by_obstime[value.obstime][mnemonic] = value
by_obstime = sorted(by_obstime.items())
# Created the ordered matrix
ordered = []
last_obstime = {}
for obstime, mnemonics_at_time in by_obstime:
last_obstime.update(mnemonics_at_time)
# Engineering data may be present, but all zeros.
# Filter out this situation.
values = [value.value for value in last_obstime.values()]
if not any(values):
continue
ordered.append((obstime, copy(last_obstime)))
return ordered
def mnemonics_chronologically_table(mnemonics):
"""
Return time-ordered mnemonic list with progressive values.
The different set of mnemonics used for observatory orientation
appear at different cadences. This routine creates a time-ordered dictionary
with all the mnemonics for each time found in the engineering. For mnemonics
missing for a particular time, the last previous value is used.
Parameters
----------
mnemonics : {mnemonic: [value[,...]]}
Dictionary mapping mnemonics to their respective values.
Returns
-------
ordered_by_time : `astropy.table.Table`
Time-ordered mnemonic list with progressive values.
"""
ordered = mnemonics_chronologically(mnemonics)
names = list(mnemonics.keys())
names = ["time", *names]
time_idx = 0
values = [[] for _ in names]
for time, mnemonics_at_time in ordered:
values[time_idx].append(time)
for mnemonic in mnemonics_at_time:
idx = names.index(mnemonic)
values[idx].append(ordered[time][mnemonic].value)
t = Table(values, names=names)
return t
def t_pars_from_model(model, t_pars):
"""
Initialize TransformParameters from a DataModel.
Parameters
----------
model : DataModel
Data model to initialize from.
t_par : TransformParameters
Transformation parameters updated with model information.
Updating is performed in-place.
"""
# Instrument details
t_pars.aperture = model.meta.wcsinfo.aperture_name
try:
exp_type = model.meta.exposure.type.lower()
except AttributeError:
exp_type = None
t_pars.exp_type = exp_type
# observation parameters
t_pars.obsstart = model.meta.exposure.start_time
t_pars.obsend = model.meta.exposure.end_time
ephem = model.meta.ephemeris
t_pars.velocity = (ephem.velocity_x, ephem.velocity_y, ephem.velocity_z)
def dcm(alpha, delta, angle):
"""
Construct the Direction Cosine Matrix (DCM).
Typical usage is passing of (RA, DEC, PositionAngle).
All values must be in radians.
Parameters
----------
alpha : float
First coordinate in radians.
delta : float
Second coordinate in radians.
angle : float
Position angle in radians.
Returns
-------
dcm : np.array((3, 3))
The 3x3 direction cosine matrix.
"""
dcm = np.array(
[
[cos(delta) * cos(alpha), cos(delta) * sin(alpha), sin(delta)],
[
-cos(angle) * sin(alpha) + sin(angle) * sin(delta) * cos(alpha),
cos(angle) * cos(alpha) + sin(angle) * sin(delta) * sin(alpha),
-sin(angle) * cos(delta),
],
[
-sin(angle) * sin(alpha) - cos(angle) * sin(delta) * cos(alpha),
sin(angle) * cos(alpha) - cos(angle) * sin(delta) * sin(alpha),
cos(angle) * cos(delta),
],
]
)
return dcm
def update_meta(model, wcsinfo, vinfo, quality):
"""Update model's meta info with the given pointing.
The following meta are update:
- meta.pointing.dec_v1
- meta.pointing.pa_aperture
- meta.pointing.pa_v3
- meta.pointing.ra_v1
- meta.pointing.target_dec
- meta.pointing.target_ra
- meta.wcsinfo.dec_ref
- meta.wcsinfo.ra_ref
- meta.wcsinfo.roll_ref
- meta.wcsinfo.s_region
Parameters
----------
model : `~roman.datamodels.DataModel`
The model to update. Updates are done in-place.
wcsinfo : `WCSRef``
The aperture-specific pointing.
vinfo : ``WCSRef`
The V1-specific pointing
quality : str
Indicator of the success of the pointing determination.
"""
# Shortcuts to the meta blocks
wm = model.meta.wcsinfo
pm = model.meta.pointing
# Setup SIAF info.
from pysiaf import Siaf
siaf = Siaf("roman")
aper = siaf[wm.aperture_name.upper()]
# Set the quality
model.meta.pointing.pointing_engineering_source = quality
# Update SIAF-related meta
wm.v2_ref = aper.V2Ref
wm.v3_ref = aper.V3Ref
wm.v3yangle = aper.V3IdlYAngle
wm.vparity = aper.VIdlParity
# Update aperture-centric pointing, represented by `wcsinfo`
wm.dec_ref = wcsinfo.dec
wm.ra_ref = wcsinfo.ra
wm.roll_ref = wcsinfo.pa
wm.s_region = wcsinfo.s_region
# Update general pointing information, including V1.
pm.dec_v1 = vinfo.dec
pm.pa_aperture = wcsinfo.pa
pm.pa_v3 = vinfo.pa
pm.ra_v1 = vinfo.ra
# Update target's sky location.
attitude = attitude_from_v1(vinfo)
targ_app = siaf[model.meta.pointing.target_aperture]
targ_app.set_attitude_matrix(attitude)
skycoord = targ_app.reference_point(to_frame="sky")
pm.target_ra = skycoord[0]
pm.target_dec = skycoord[1]
# If not present, stub-out keywords that are expected
# in L1 products. 26Q2B21 will deal with this bad situation
gs = model.meta.guide_star
gs.corrected_ra = getattr(gs, "corrected_ra", pm.target_ra)
gs.corrected_dec = getattr(gs, "corrected_dec", pm.target_dec)
gs.h = getattr(gs, "h", wm.v2_ref)
gs.v = getattr(gs, "v", wm.v3_ref)
def attitude_from_v1(vinfo):
"""Calculate observatory attitude matrix based on V1 pointing
Return the attitude matrix used by `pysiaf.Aperture.set_attitude_matrix`
to enable transformations to/from the 'sky' frame.
Parameters
----------
vinfo : `WCSRef`
The WCS information for V1, the boresight.
Returns
-------
attitude : np.array((3, 3), dtype=float)
The attitude matrix.
"""
from pysiaf import Siaf
from pysiaf.utils.rotations import attitude_matrix
siaf = Siaf("roman")
boresight = siaf["BORESIGHT"]
v_refpoint = boresight.reference_point(to_frame="tel")
attitude = attitude_matrix(*v_refpoint, vinfo.ra, vinfo.dec, vinfo.pa)
return attitude
def calc_m_fgs2gs(x, y):
"""Calculate DCM that converts FGS reference to guide star location
Parameters
----------
x, y : float, float
Guidestar location (radians)
"""
m = np.array(
[
[cos(x), 0.0, sin(x)],
[-sin(x) * sin(y), cos(y), cos(x) * sin(y)],
[-sin(x) * cos(y), -sin(y), cos(x) * cos(y)],
]
)
return m
def calc_gsapp2gs(m_eci2gsapp, velocity):
"""
Calculate the Velocity Aberration correction.
This implements Eq. 40 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3.2.5:
The velocity aberration correction is applied in the direction of the guide
star. The matrix that translates from ECI to the apparent guide star ICS
frame is M_(ECI→GSAppICS), where the GS Apparent position vector is along
the z-axis in the guide star ICS frame.
Note that the convention for Roman is in the opposite direction. The algorithm is the same,
just different input/output.
Parameters
----------
m_eci2gsapp : numpy.array(3, 3)
The the ECI to Guide Star transformation matrix, in the ICS frame.
velocity : numpy.array([dx, dy, dz])
The barycentric velocity.
Returns
-------
m_gsapp2gs : numpy.array(3, 3)
The velocity aberration correction matrix.
"""
# Check velocity. If present, negate the velocity since
# the desire is to remove the correction.
velocity = np.array(velocity)
if not velocity.all() or np.sum(np.abs(velocity) > MAX_OBSERVATORY_SPEED):
logger.warning(
"Velocity: %s is either unspecified or contains unreasonable values. Cannot calculate aberration. Returning identity matrix",
velocity,
)
return np.identity(3)
velocity = -1 * velocity
# Eq. 35: Guide star position vector
uz = np.array([0.0, 0.0, 1.0])
u_gseci = np.dot(np.transpose(m_eci2gsapp), uz)
# Eq. 36: Compute the apparent shift due to velocity aberration.
try:
scale_factor, u_gseci_app = compute_va_effects_vector(*velocity, u_gseci)
except TypeError:
logger.warning(
"Failure in computing velocity aberration. Returning identity matrix."
)
logger.warning("Exception: %s", sys.exc_info())
return np.identity(3)
# Eq. 39: Rotate from ICS into the guide star frame.
u_gs_app = np.dot(m_eci2gsapp, u_gseci_app)
# Eq. 40: Compute the M_gsapp2gs matrix
u_prod = np.cross(uz, u_gs_app)
u_prod_mag = np.linalg.norm(u_prod)
a_hat = u_prod / u_prod_mag
m_a_hat = np.array(
[
[0.0, -a_hat[2], a_hat[1]],
[a_hat[2], 0.0, -a_hat[0]],
[-a_hat[1], a_hat[0], 0.0],
]
)
theta = np.arcsin(u_prod_mag)
m_gsapp2gs = (
np.identity(3)
- (m_a_hat * np.sin(theta))
+ (2 * m_a_hat**2 * np.sin(theta / 2.0) ** 2)
)
logger.debug("m_gsapp2gs: %s", m_gsapp2gs)
return m_gsapp2gs
def hv_to_fgs(aperture_name, h, v):
"""Convert HV frame to FGS frame
Source document: Innerspace Confluence: "Quaternion Transforms for Coarse Pointing WCS"
from the section "Derive M_fgstogsapp". The coordinates systems are based on
Eq. 7 of "Roman-STScI-000416 (Roman SOC FGS Algorithms)"
Parameters
----------
aperture_name : str
The aperture in which the guide star is located.
h, v : float, float
The commanded coordinates in the HV frame.
Units are in pixels.
Returns
-------
fgs_x, fgs_y : float, float
The coordinates in the FGS reference frame in arcsec.
"""
from pysiaf import Siaf
siaf = Siaf("roman")
aper = siaf[aperture_name]
aper_wfi_cen = siaf["WFI_CEN"]
# Location of GS. This is from Eqn. 7 of Roman-STScI-000416 (Roman SOC FGS Algorithms)
x, y = h + aper.XSciRef, aper.YSciRef - v
v2, v3 = aper.sci_to_tel(x, y) # Convert sci to tel coords using pysiaf
# Convert tel to WFI_CEN idl coords using pysiaf.
# The only difference between FGS and WFI_CEN coordinates is that the Y-axis is flipped
x_wc, y_wc = aper_wfi_cen.tel_to_idl(v2, v3)
x_fgs, y_fgs = x_wc, -y_wc
return x_fgs, y_fgs
def calc_m_b2fgs(fgs_q=None):
"""Calculate the B-to-FGS frame DCM
Parameters
----------
fgs_q : [float, float, float, float] or None
The quaterion representing the B to FGS transformation.
If no B-to-FGS quaternion is given, use a pre-launch defined matrix.
"""
if fgs_q is None:
logger.warning("No B-to-FGS information is given. Using pre-launch values.")
return M_B2FCS0
# Calculate the DCM from the quaterion
return calc_quat2matrix(fgs_q)
def calc_m_fgs2gsapp(x, y):
"""Calculate the FGS to Guide star apparent transformation
Source document: Innerspace Confluence: "Quaternion Transforms for Coarse Pointing WCS"
from the section "Derive M_fgstogsapp".
Parameters
----------
x, y : float, float
Position in arcseconds in the FGS frame
Returns
-------
m_fgs2gsapp : np.array(3,3)
The DCM representing the transformation.
"""
x_gs, y_gs = A2R * x, A2R * y
cx, sx = cos(x_gs), sin(x_gs)
cy, sy = cos(y_gs), sin(y_gs)
m_x = np.array([[cx, 0, -sx], [0, 1, 0], [sx, 0, cx]])
m_y = np.array([[1, 0, 0], [0, cy, sy], [0, -sy, cy]])
m_gsapp2fgs = np.dot(m_y, m_x)
m_fgs2gsapp = m_gsapp2fgs.T
return m_fgs2gsapp