from pathlib import Path
from typing import NamedTuple
import astropy.units as u
import numpy as np
[docs]
class SpxGeometry(NamedTuple):
nav: int
lat: float
lon: float
phase: float
emission: float
azimuth: float
wgeom: float
wavelengths: np.ndarray
spectrum: np.ndarray
error: np.ndarray
[docs]
class SpxFileData(NamedTuple):
fwhm: float
lat: float
lon: float
geometries: tuple[SpxGeometry, ...]
[docs]
def read(path: str) -> SpxFileData:
"""
Read in a .spx file from disk.
The returned data is a SpxFileData object containing the data from the .spx file.
For example, you can access the latitude of the observation with `data.lat` or the
spectrum of the first geometry with `data.geometries[0].spectrum`.
Args:
path: The path to the .spx file.
Returns:
A SpxFileData object of the form `(fwhm, lat, lon, geometries)` containing the
data from the .spx file.
"""
with open(path, 'r') as f:
fwhm, hdr_lat, hdr_lon, ngeom = (float(x) for x in f.readline().split())
ngeom = int(ngeom)
geometries: list[SpxGeometry] = []
for _ in range(ngeom):
nconv = int(float(f.readline()))
nav = int(float(f.readline()))
lat, lon, phase, emission, azimuth, wgeom = (
float(x) for x in f.readline().split()
)
wavelengths = np.full(nconv, np.nan)
spectrum = np.full(nconv, np.nan)
error = np.full(nconv, np.nan)
for i in range(nconv):
w, s, e = (float(x) for x in f.readline().split())
wavelengths[i] = w
spectrum[i] = s
error[i] = e
geometries.append(
SpxGeometry(
nav=nav,
lat=lat,
lon=lon,
phase=phase,
emission=emission,
azimuth=azimuth,
wgeom=wgeom,
wavelengths=wavelengths,
spectrum=spectrum,
error=error,
)
)
return SpxFileData(
fwhm=fwhm,
lat=hdr_lat,
lon=hdr_lon,
geometries=tuple(geometries),
)
[docs]
def write(
path: str | Path,
data: SpxFileData | None = None,
convert: bool = True,
**kwargs,
) -> None:
"""
Write an SPX file to disk, for example:
spx.write(
'spectrum.spx',
wavelengths=wavelengths,
spectrum=spectrum, # In MJy/sr if convert is False, else W/cm2/sr/um
error=error, # In MJy/sr if convert is False, else W/cm2/sr/um
fwhm=0,
lat=20.0,
lon=30.0,
phase=0.0,
emission=0.0,
azimuth=0.0,
)
Args:
path: The path to write the SPX file to.
data: An SpxFileData object to write to disk. If `None`, the data will be
constructed from the keyword arguments.
kwargs: Keyword arguments to construct the SpxFileData object if `data` is
`None`. These are passed to `construct_spx`.
"""
if data is None:
data = construct_spx(convert=convert, **kwargs)
lines: list[str] = []
ngeom = len(data.geometries)
lines.append(f'{data.fwhm}\t{data.lat}\t{data.lon}\t{ngeom}')
for g in data.geometries:
mask = ~(np.isnan(g.spectrum) | np.isnan(g.error))
nconv = len(g.wavelengths[mask])
lines.append(f'{nconv}')
lines.append(f'{g.nav}')
lines.append(
f'{g.lat}\t{g.lon}\t{g.phase}\t{g.emission}\t{g.azimuth}\t{g.wgeom}'
)
for w, s, e in zip(g.wavelengths[mask], g.spectrum[mask], g.error[mask]):
lines.append(f'{w}\t{s}\t{e}')
lines.append('') # end file with a newline
with open(path, 'w') as f:
f.write('\n'.join(lines))
[docs]
def construct_spx(
wavelengths: np.ndarray,
spectrum: np.ndarray,
error: np.ndarray,
convert: bool = True,
*,
fwhm: float,
lat: float,
lon: float,
phase: float,
emission: float,
azimuth: float,
) -> SpxFileData:
"""
Construct an SpxFileData object from the given data.
The units should be MJy/sr if convert is True, else W/cm2/sr/um.
"""
spectrum = convert_MJysr_to_Wcm2srum(wavelengths, spectrum) if convert else spectrum
error = convert_MJysr_to_Wcm2srum(wavelengths, error) if convert else error
return SpxFileData(
fwhm=fwhm,
lat=lat,
lon=lon,
geometries=(
SpxGeometry(
nav=1,
lat=lat,
lon=lon,
phase=phase,
emission=emission,
azimuth=azimuth,
wgeom=1,
wavelengths=wavelengths,
spectrum=spectrum,
error=error),
),
),
[docs]
def convert_MJysr_to_Wcm2srum(
wavelengths: np.ndarray,
spectrum: np.ndarray,
) -> np.ndarray:
"""
Convert a spectrum or cube from MJy/sr to W/cm2/sr/micron.
"""
while len(wavelengths.shape) < len(spectrum.shape):
# work for cubes passed to sp
wavelengths = np.expand_dims(wavelengths, -1)
spx_MJysr = spectrum * u.MJy / u.sr
spx_Wcm2srum = spx_MJysr.to(
u.W / (u.cm * u.cm) / u.sr / u.micron,
equivalencies=u.spectral_density(wavelengths * u.micron),
)
return spx_Wcm2srum.value
[docs]
def convert_Wcm2srum_to_MJysr(
wavelengths: np.ndarray,
spectrum: np.ndarray,
) -> np.ndarray:
"""
Convert a spectrum or cube from W/cm2/sr/micron to MJy/sr.
"""
while len(wavelengths.shape) < len(spectrum.shape):
# work for cubes passed to sp
wavelengths = np.expand_dims(wavelengths, -1)
spx_Wcm2srum = spectrum * u.W / (u.cm * u.cm) / u.sr / u.micron
spx_MJysr = spx_Wcm2srum.to(
u.MJy / u.sr,
equivalencies=u.spectral_density(wavelengths * u.micron),
)
return spx_MJysr.value
"""Write data (in MJy/sr) to a .spx file (in W/cm2/sr/um)."""
converted_spectrum = convert_MJysr_to_Wcm2srum(wavelengths, spectrum)
converted_error = convert_MJysr_to_Wcm2srum(wavelengths, error)
spx_data = construct_spx(wavelengths, converted_spectrum, converted_error, **kwargs)
write(filepath, spx_data)