"""This module provides parsing objects for reading some NEMESIS files, such as nemesis.ref"""
import pandas as pd
import itertools as it
import io
import numpy as np
from pathlib import Path
import astropy.units as u
import struct
import matplotlib.pyplot as plt
import warnings
warnings.formatwarning = lambda msg, *_: f"Warning: {msg}\n"
from . import utils
from . import constants
from . import profiles as profiles_
from . import shapes
## TODO: Move all parsing routines here, .itr, .prc etc...
## I also want to be able to completely reconstruct the core object from these files so no pickling of NemesisCore needed, but thats a little way off
[docs]
class Parser:
[docs]
def __init__(self, filepath):
self.filepath = Path(filepath)
self.read()
[docs]
class NemesisRef(Parser):
"""Parser for nemesis.ref and nemesis.prf
Attributes:
amform: How the VMRs add up (always assumed to be 1: sum(VMR per layer)==1)
planet_id: ID of the planet being analysed.
latitude: Latitude at which the .ref file applies
num_layers: Number of layers used
num_gases: Number of gases in the file
gas_names: List of the names of all the gases
data: Contains the pressure, temperature, and VMR profiles of each gas at each height
"""
[docs]
def __init__(self, filepath):
super().__init__(filepath)
[docs]
def read(self):
with open(self.filepath) as file:
lines = file.read().split("\n")
# remove the stupid extra magic number at the top of ref files
if self.filepath.suffix == ".ref":
del lines[1]
extra_header = 1
else:
extra_header = 0
self.amform, = utils.get_ints_from_string(lines[0])
planet_id, latitude, num_layers, num_gases = utils.get_floats_from_string(lines[1])
self.planet_id = int(planet_id)
self.latitude = latitude
self.num_layers = int(num_layers)
self.num_gases = int(num_gases)
self.gas_names = []
for l in lines[2:2+int(self.num_gases)]:
gas_id, isotope_id = utils.get_ints_from_string(l)
gas_name = constants.GASES[constants.GASES.radtrans_id == gas_id].name.iloc[0]
self.gas_names.append(f"{gas_name} {isotope_id}")
self.data = pd.read_table(self.filepath,
skiprows=3+extra_header+self.num_gases,
sep=r"\s+",
header=None)
self.data.columns = ["height", "pressure", "temperature"] + self.gas_names
[docs]
def write(self, filepath):
"""Write the object to a new .ref file
Args:
filepath (str): Path to the new .ref file
Returns:
None
"""
out = f"1\n{self.amform}\n{self.planet_id} {self.latitude} {self.num_layers} {self.num_gases}\n"
for gas_name in self.gas_names:
name, isotopologue = gas_name.split(" ")
radtranid = constants.GASES.loc[constants.GASES["name"] == name, "radtrans_id"].iloc[0]
out += f"{radtranid:>2} {isotopologue}\n"
out += self.data.to_string(index=False)
with open(filepath, mode="w+") as file:
file.write(out)
[docs]
class NemesisMre(Parser):
"""Parser for the nemesis.mre file
Attributes:
ispec (int): Don't know
wavenumber (bool): Whether the spectrum is in wavenumber (cm-1) or wavelength (um) space
ngeom (int): Number of geometries (should be 1)
latitude (float): Latitude of the observation
longitude (float): Longitude of the observation
retrieved_spectrum pd.DataFrame: DataFrame containing the measured spectrum + all error sources and the fitted model spectra and its errors
retrieved_parameters List[pd.DataFrame]: List of DataFrames containing the retrieved parameters from each Profile"""
def _parse_header_line(self, line, num_fields, cast_to):
fields = [cast_to(x) for x in line.split()[:num_fields]]
if num_fields == 1:
return fields[0]
else:
return fields
[docs]
def read(self):
with open(self.filepath) as file:
mre_data = file.read().split("\n")
header = []
blocks = []
for i, line in enumerate(mre_data):
# Read the first 3 lines to the header array
if i < 3:
header.append(line)
# Find the boundaries between result blocks
elif "Variable" in line:
blocks.append(i)
blocks.append(i)
# Set some attributes from the header info
self.ispec, self.ngeom, _,_,_ = self._parse_header_line(header[1], num_fields=5, cast_to=int)
self.latitude, self.longitude = self._parse_header_line(header[2], num_fields=2, cast_to=float)
self.wavenumber = mre_data[3].strip().endswith("cm")
name = "wavenumber" if self.wavenumber else "wavelength"
# Read in the fitted spectrum as a DataFrame
self.retrieved_spectrum = pd.read_table(self.filepath,
names=[name, "measured", "error", "pct_error", "model", "pct_diff"],
index_col=0, sep=r"\s+", skiprows=5, nrows=blocks[0]-7)
# Read in each retrieved parameter
self.retrieved_parameters = []
self.initial_state_vector = []
with open(self.filepath) as file:
for start, end in it.pairwise(blocks):
data = utils.read_between_lines(file, start, end)
df = pd.read_table(io.StringIO(data), skiprows=4, sep=r"\s+", names=["i", "ix", "prior", "prior_error", "retrieved", "retrieved_error"])
df.drop(["i", "ix"], axis=1, inplace=True)
self.initial_state_vector += list(df.prior)
self.retrieved_parameters.append(df)
[docs]
class NemesisXsc(Parser):
"""Parser for the nemesis.xsc file
Attributes:
xsc (pd.DataFrame): The aerosol cross-sections as a function of wavelength for each aerosol mode
ssa (pd.DataFrame): The single scattering albedos as a function of wavelength for each aerosol modes"""
[docs]
def read(self):
waves = []
ssas = []
xscs = []
with open(self.filepath) as file:
for i, line in enumerate(file):
if i == 0:
continue
if i % 2 == 1:
wavelength, *x = utils.get_floats_from_string(line)
waves.append(wavelength)
xscs.append(x)
else:
s = utils.get_floats_from_string(line)
ssas.append(s)
self.ssa = pd.DataFrame(ssas)
self.ssa.insert(0, column="wavelength", value=waves)
self.xsc = pd.DataFrame(xscs)
self.xsc.insert(0, column="wavelength", value=waves)
[docs]
def add_aerosol_names(self, names=None):
"""Add the aerosol profile names/labels to the ssa and xsc DataFrames
Args:
names (None / str / AerosolProfile / list[AerosolProfile]): The names to give the aerosol profiles.
By default it looks for a aerosol_names.txt file that Eleos generated when a core is generated.
This can be overridden with a string, AerosolProfile, or a list of either type.
Returns:
None
"""
if names is None:
names = []
with open(Path(self.filepath).parent / "aerosol_names.txt") as file:
n = file.read().split("\n")
for m in n:
names.append(m)
elif isinstance(names, profiles_.AerosolProfile):
names = [names.label]
elif isinstance(names, str):
names = [names]
elif all([isinstance(p, profiles_.AerosolProfile) for p in names]):
names = [p.label for p in names]
else:
raise ValueError("names must be either None, a single string, a list of strings, a single AerosolProfile, or a list of AerosolProfiles")
self.ssa.columns = ["wavelength"] + names
self.xsc.columns = ["wavelength"] + names
[docs]
def plot(self, ssa_ax=None, xsc_ax=None):
if ssa_ax is None or xsc_ax is None:
fig, (ssa_ax, xsc_ax) = plt.subplots(2,1, sharex=True)
for column in self.ssa.columns[1:]:
ssa_ax.plot(self.ssa.wavelength, self.ssa[column], label=str(column))
xsc_ax.plot(self.xsc.wavelength, self.xsc[column], label=str(column))
ssa_ax.set_xlabel("Wavelength (microns)")
ssa_ax.set_ylabel("Single Scattering Albedo")
xsc_ax.set_xlabel("Wavelength (microns)")
xsc_ax.set_ylabel("Extinction Coefficient")
ssa_ax.legend()
xsc_ax.legend()
return ssa_ax, xsc_ax
[docs]
class HGPhase(Parser):
"""Parser for the HG phase function file
Attributes:
hg (DataFrame): DataFrame containing f, g1, and g2 as a function of wavelength"""
[docs]
def read(self):
data = pd.read_table(self.filepath, sep=r"\s+", header=None)
data.columns = ["wavelength", "f", "g1", "g2"]
self.hg = data
[docs]
def plot(self, f_ax=None, g1_ax=None, g2_ax=None):
if f_ax is None or g1_ax is None or g2_ax is None:
fig, (f_ax, g1_ax, g2_ax) = plt.subplots(3,1, sharex=True)
f_ax.plot(self.hg.wavelength, self.hg.f, label=self.filepath.name)
g1_ax.plot(self.hg.wavelength, self.hg.g1, label=self.filepath.name)
g2_ax.plot(self.hg.wavelength, self.hg.g2, label=self.filepath.name)
f_ax.set_ylabel("f")
g1_ax.set_ylabel("g1")
g2_ax.set_ylabel("g2")
g2_ax.set_xlabel("Wavelength (microns)")
f_ax.legend()
g1_ax.legend()
g2_ax.legend()
return f_ax, g1_ax, g2_ax
[docs]
def plot_phase(self, wavelength, ax=None):
def hg(theta, g):
return 1/(4*np.pi) * (1 - g**2) / (1 + g**2 - 2*g*np.cos(theta))**(3/2)
def double_hg(theta, f, g1, g2):
return f*hg(theta, g1) + (1-f)*hg(theta, g2)
if ax is None:
fig = plt.figure()
ax = plt.subplot(111, projection="polar")
theta = np.linspace(0, 2*np.pi, 500)
i, wl = utils.find_nearest(self.hg.wavelength, wavelength)
f = self.hg.f.iloc[i]
g1 = self.hg.g1.iloc[i]
g2 = self.hg.g2.iloc[i]
phase = double_hg(theta, f, g1, g2)
ax.plot(theta, phase, label=f"{self.filepath.name} at {wl:.2f}um")
ax.legend()
return ax
[docs]
class NemesisItr(Parser):
"""Parser for nemesis.itr. Also requires a NemesisMre parser
Attributes:
state_vectors (pd.DataFrame): Linear state vectors for each iteration"""
[docs]
def __init__(self, filepath, mre=None):
if mre is None:
try:
fp = Path(filepath).parent / "nemesis.mre"
self.mre = NemesisMre(fp)
except:
print("Warning: Failed to load {fp}, skipping state vector linearisation")
self.mre = None
else:
self.mre = mre
super().__init__(filepath)
[docs]
def read(self):
data = []
count = -1
with open(self.filepath) as file:
for i, line in enumerate(file.read().split("\n")):
if line == " ":
count = 3
else:
count -= 1
if count == 0:
d = []
for i, v in enumerate(line.split()):
d.append(float(v))
data.append(d)
if self.mre is not None:
exps = []
for i, value in enumerate(data[0]):
flag = np.isclose(self.mre.initial_state_vector[i], np.exp(value))
exps.append(flag)
else:
exps = [False] * len(data[0])
self.state_vectors = pd.DataFrame(data)
for i, column in enumerate(self.state_vectors.columns):
if exps[i]:
self.state_vectors[column] = np.exp(self.state_vectors[column])
[docs]
def add_column_names(self, profiles):
names = []
for label, profile in profiles.items():
if isinstance(profile.shape, shapes.Shape0):
names += [f"{label} {n}" for n in range(120)] # temp value FIX
else:
names += [f"{label} {n}" for n in profile.VARIABLES]
self.state_vectors.columns = names
[docs]
class NemesisPrc(Parser):
"""Parser for nemesis.prc
Attributes:
chisq (List[float]): List of chi-squared values"""
[docs]
def read(self):
self.chisq = []
with open(self.filepath) as file:
for line in file:
if "chi" in line and "should" not in line:
fs = utils.get_floats_from_string(line)
if len(fs) == 0:
self.chisq.append(np.nan)
else:
self.chisq.append(fs[0])
[docs]
def write_chisqs(self, filepath):
with open(filepath, mode="w+") as file:
for chisq in self.chisq:
file.write(f"{chisq}\n")
[docs]
class NemesisSpx(Parser):
"""
Parser for the nemesis.spx file
Attributes:
lat (float): Latitude of the observation
lon (float): longitude of the observation
phase (float): Phase angle of the observation
emission (float): Emission angle of the observation
azimuth (float): Azimuth angle of the observation
wgeom (float): wgeom
nconv (int): nconv
nav (int): nav
fwhm (float): FWHM of the instrument
wavelengths (np.ndarray): Wavelengths of the observed spectra in um
spectrum (np.ndarray): The observed spectra in W/cm2/sr/um
error (np.ndarray): The error on the observed spectra in W/cm2/sr/um
"""
[docs]
def read(self):
"""Read the spx file in
Args:
None
Returns:
None
"""
with open(self.filepath) as f:
self.fwhm, *_ = (float(x) for x in f.readline().split())
self.nconv = int(float(f.readline()))
self.nav = int(float(f.readline()))
self.lat, self.lon, self.phase, self.emission, self.azimuth, self.wgeom = (
float(x) for x in f.readline().split()
)
self.wavelengths = np.full(self.nconv, np.nan)
self.spectrum = np.full(self.nconv, np.nan)
self.error = np.full(self.nconv, np.nan)
for i in range(self.nconv):
w, s, e = (float(x) for x in f.readline().split())
self.wavelengths[i] = w
self.spectrum[i] = s
self.error[i] = e
[docs]
def write(self, filepath, exclude_nans=True):
"""
Write an SPX file to disk
Args:
filepath (str): The filepath of the new .spx file
Returns:
None
"""
lines = []
lines.append(f'{self.fwhm}\t{self.lat}\t{self.lon}\t1')
if exclude_nans:
mask = ~(np.isnan(self.spectrum) | np.isnan(self.error))
else:
mask = np.ones_like(self.spectrum).astype(bool)
nconv = len(self.wavelengths[mask])
lines.append(f'{nconv}')
lines.append(f'{self.nav}')
lines.append(f'{self.lat}\t{self.lon}\t{self.phase}\t{self.emission}\t{self.azimuth}\t{self.wgeom}')
if np.max(self.spectrum) > 1e-4:
warnings.warn(f"This spectrum looks extremely bright (max value {np.max(self.spectrum)}) - have you converted from MJy/sr to W/cm2/sr/um?")
for w, s, e in zip(self.wavelengths[mask], self.spectrum[mask], self.error[mask]):
lines.append(f'{w: 12f} {s: 12e} {e: 12e}')
lines.append('') # end file with a newline
with open(filepath, 'w') as f:
f.write('\n'.join(lines))
@property
def wavelength(self):
return self.wavelengths
@wavelength.setter
def wavelength(self, value):
self.wavelengths = np.array(value)
@property
def errors(self):
return self.error
@errors.setter
def errors(self, value):
self.error = np.array(value)
[docs]
@classmethod
def from_lists(cls,
wavelengths,
spectrum,
error,
lat, lon, phase, emission, azimuth, fwhm=0,
convert=True):
"""Create a new NemesisSpx object from three lists (wavelength, spectrum, errors) and a set of parameters (lat, lon etc...)
See also: `NemesisSpx.get_angles()`
Args:
wavelengths (list or array-like): Wavelength values of the observation.
spectrum (list or array-like): Measured spectral radiance/reflectance values in units of MJy/sr or W/cm2/sr/um (see `convert`).
error (list or array-like): Measurement uncertainties for each spectral point.
lat (float): Latitude of the observation point (degrees).
lon (float): Longitude of the observation point (degrees).
phase (float): Phase angle between the observer, target, and light source (degrees).
emission (float): Emission angle of the observation (degrees).
azimuth (float): Azimuth angle (degrees).
fwhm (float, optional): Full width at half maximum of the spectral resolution. Defaults to 0.
convert (bool, optional): If True, converts the input values from MJy/sr to W/cm2/sr/um.
Defaults to False.
Returns:
NemesisSpx: A new instance populated with the given spectral and geometric data.
"""
spx = cls.__new__(cls)
if convert:
spx.spectrum = NemesisSpx.convert_MJysr_to_Wcm2srum(wavelengths, spectrum)
spx.error = NemesisSpx.convert_MJysr_to_Wcm2srum(wavelengths, error)
else:
spx.spectrum = np.array(spectrum)
spx.error = np.array(error)
spx.wavelengths = np.array(wavelengths)
spx.lat = lat
spx.lon = lon
spx.phase = phase
spx.emission = emission
spx.azimuth = azimuth
spx.fwhm = fwhm
spx.nconv = len(spx.wavelengths)
spx.nav = 1
spx.wgeom = 1
return spx
[docs]
def get_angles(self):
"""
Retrieve the geometric parameters of the observation. useful for creating modified versions of the same spectrum (see `NemesisSpx.from_lists()`)
Args:
None
Returns:
dict: Dictionary containing:
- 'lat' (float): Latitude in degrees.
- 'lon' (float): Longitude in degrees.
- 'phase' (float): Phase angle in degrees.
- 'emission' (float): Emission angle in degrees.
- 'azimuth' (float): Azimuth angle in degrees.
"""
return {
"lat": self.lat,
"lon": self.lon,
"phase": self.phase,
"emission": self.emission,
"azimuth": self.azimuth
}
[docs]
@staticmethod
def convert_MJysr_to_Wcm2srum(
wavelengths: np.ndarray,
spectrum: np.ndarray,
) -> np.ndarray:
"""
Convert a spectrum or cube from MJy/sr to W/cm2/sr/micron.
"""
while len(wavelengths.shape) < len(spectrum.shape):
# work for cubes passed to sp
wavelengths = np.expand_dims(wavelengths, -1)
spx_MJysr = spectrum * u.MJy / u.sr
spx_Wcm2srum = spx_MJysr.to(
u.W / (u.cm * u.cm) / u.sr / u.micron,
equivalencies=u.spectral_density(wavelengths * u.micron),
)
return spx_Wcm2srum.value
[docs]
@staticmethod
def convert_Wcm2srum_to_MJysr(
wavelengths: np.ndarray,
spectrum: np.ndarray,
) -> np.ndarray:
"""
Convert a spectrum or cube from W/cm2/sr/micron to MJy/sr.
"""
while len(wavelengths.shape) < len(spectrum.shape):
# work for cubes passed to sp
wavelengths = np.expand_dims(wavelengths, -1)
spx_Wcm2srum = spectrum * u.W / (u.cm * u.cm) / u.sr / u.micron
spx_MJysr = spx_Wcm2srum.to(
u.MJy / u.sr,
equivalencies=u.spectral_density(wavelengths * u.micron),
)
return spx_MJysr.value
[docs]
class NemesisInp(Parser):
"""Parser for nemesis.inp
Attributes:
wavelength (bool): Whether to use wavelength or wavenumber
scattering (bool): Whether to use a scattering run
linebyline (bool): Whether to use line-by-line or corrrelated-k method
woff (float): Wavelength offset
fmerror_path (str): Path to the file contianing forward modelling errors for each wavelength
num_iterations (int): Maximum number of iterations
min_phi_change (float): Minimim percentage change in phi before terminating
num_spectra (int): Number of spectra to retrieve
start_spectra (int): ID of spectra to start with
sub_previous (bool): Whether to use a previous retrieval
output_format (int): Format of the output files
"""
[docs]
def read(self):
with open(self.filepath) as file:
lines = file.read().split("\n")
self.wavelength, self.scattering, self.linebyline = map(bool, utils.get_ints_from_string(lines[0]))
self.woff, = utils.get_floats_from_string(lines[1])
self.fmerror_path = lines[2]
self.num_iterations, = utils.get_ints_from_string(lines[3])
self.min_phi_change, = utils.get_floats_from_string(lines[4])
self.num_spectra, self.start_spectra = utils.get_ints_from_string(lines[5])
self.sub_previous, = map(bool, utils.get_ints_from_string(lines[6]))
self.output_format, = utils.get_ints_from_string(lines[7])
[docs]
class MakephaseOut(Parser):
[docs]
def __init__(self, filepath):
super().__init__(filepath)
self.add_aerosol_names()
[docs]
def read(self):
xsc = NemesisXsc(Path(self.filepath).parent / "nemesis.xsc")
wavelengths = xsc.xsc.wavelength
with open(self.filepath) as file:
isprev = False
start = False
out = []
for line in file:
if not isprev and not start:
out.append([])
start = True
if line.startswith(" Refractive index:"):
isprev = True
start = False
out[-1].append(utils.get_floats_from_string(line))
else:
isprev = False
out = pd.DataFrame(np.hstack(out))
out.insert(0, "wavelength", wavelengths)
self.data = out
[docs]
def add_aerosol_names(self):
# Get the names of the aerosol layers
names = []
with open(Path(self.filepath).parent / "aerosol_names.txt") as file:
n = file.read().split("\n")
for m in n:
names.append(m + " real")
names.append(m + " imag")
[docs]
class AerosolRef(Parser):
"""Parser for the aerosol.prf file
Attributes:
num_modes (int): The number of aerosol modes defined in the file
data (pandas.DataFrame): DataFrame containing the aerosol density as a function of height. The units of aerosol density are particles per gram of atmosphere"""
[docs]
def __init__(self, filepath):
super().__init__(filepath)
self.add_aerosol_names()
[docs]
@classmethod
def empty(cls, heights, num_modes=1):
ref = cls.__new__(cls)
ref.num_modes = num_modes
ref.data = pd.DataFrame({"height":heights})
for i in range(num_modes):
ref.data[f"aerosol_{i}"] = ["0.0" for _ in range(len(heights))]
return ref
[docs]
def read(self):
self.data = pd.read_table(self.filepath, sep=r"\s+", skiprows=2, header=None)
self.num_modes = len(self.data.columns) - 1
header = ["height"] + [f"aerosol_{x}" for x in range(1, self.num_modes+1)]
self.data.columns = header
[docs]
def add_aerosol_names(self):
with open(Path(self.filepath).parent / "aerosol_names.txt") as file:
names = file.read().split("\n")
if names == [""]:
return
self.data.columns = ["height"] + names
[docs]
def write(self, filepath):
"""Write the data in the object to a new aerosol.ref file
Args:
filepath (str): The filepath to the new aerosol.ref file
Returns:
None
"""
out = f"# Generated by Eleos\n{len(self.data)} {len(self.data.columns)-1}\n"
out += self.data.to_string(header=None, index=None)
with open(filepath, mode="w+") as file:
file.write(out)
[docs]
class ParaH2Ref(Parser):
"""Parser for the parah2.ref file
Attributes:
data: pd.DataFrame containing the aerosol density as a function of height. The units of aerosol density are particles per gram of atmosphere"""
[docs]
def __init__(self, filepath):
super().__init__(filepath)
[docs]
def read(self):
self.data = pd.read_table(self.filepath, sep=r"\s+", skiprows=1, header=None)
header = ["height", "parah2_fraction"]
self.data.columns = header
[docs]
class kTable(Parser):
"""Parser for ktables
Atributes:
ktable (np.ndarray): 4D array of absorption coefficients with axes (wavelength, pressure, temperature, g-ordinate).
wavelength (np.ndarray): Array of spectral points
pressure (np.ndarray): Array of pressure levels
temperature (np.ndarray): Array of temperature levels
g_ordinates (np.ndarray): Array of g-ordinates used in correlated-k integration
g_weights (np.ndarray): Corresponding quadrature weights for g-ordinates
irec0 (int): Number of header records
npoint (int): Number of spectral points
vmin (float): Starting value of the spectral axis
delv (float): Spectral spacing. If negative, grid is non-uniform and read from file.
fwhm (float): Full Width at Half Maximum for spectral bins
np_ (int): Number of pressure levels
nt (int): Number of temperature levels
ng (int): Number of g-ordinates
idgas (int): Identifier for the gas species
isogas (int): Identifier for the isotope of the gas
"""
[docs]
@staticmethod
def read_float(file):
return struct.unpack('f', file.read(4))[0]
[docs]
@staticmethod
def read_long(file):
return struct.unpack('i', file.read(4))[0]
[docs]
def read(self):
with open(self.filepath, 'rb') as f:
# Read header
self.irec0 = self.read_long(f)
self.npoint = self.read_long(f)
self.vmin = self.read_float(f)
self.delv = self.read_float(f)
self.fwhm = self.read_float(f)
self.np_ = self.read_long(f)
self.nt = self.read_long(f)
self.ng = self.read_long(f)
self.idgas = self.read_long(f)
self.isogas = self.read_long(f)
# Read g ordinates and weights
self.g_ordinates = np.array([self.read_float(f) for _ in range(self.ng)])
self.g_weights = np.array([self.read_float(f) for _ in range(self.ng)])
# Skip two records
_ = self.read_float(f)
_ = self.read_float(f)
# Construct pressure/temperature grid
self.pressure = np.array([self.read_float(f) for _ in range(self.np_)])
self.temperature = np.array([self.read_float(f) for _ in range(self.nt)])
# Read in wavelengths
nrec = 10 + 2 * self.ng + 2 + self.np_ + self.nt
if self.delv < 0.0:
self.wavelength = np.array([self.read_float(f) for _ in range(self.npoint)])
nrec += self.npoint
else:
self.wavelength = self.vmin + self.delv * np.arange(self.npoint)
# Skip records to reach k-table data
jrec = self.irec0 - nrec - 1
for _ in range(jrec):
_ = self.read_float(f)
# Read k-table
total_values = self.npoint * self.np_ * self.nt * self.ng
ktable_flat = np.frombuffer(f.read(total_values * 4), dtype=np.float32)
self.ktable = ktable_flat.reshape((self.npoint, self.np_, self.nt, self.ng))
[docs]
class PsgSpectrum(Parser):
[docs]
def read(self):
self.data = pd.read_csv(self.filepath, comment="#", header=None, sep=r"\s+")
self.data.columns = ["wavelength", "spectrum", "drop"]
self.data.drop(["drop"], axis=1, inplace=True)
[docs]
class NemesisCov(Parser):
"""Parser for nemesis.cov
Attributes:
cov (np.ndarray): Covariance matrix of the retrieved parameters"""
[docs]
def read(self):
raise NotImplementedError("This parser is not finished. Suppress this error at your peril")
with open(self.filepath, 'r') as f:
lines = f.readlines()
_, nvar = utils.get_ints_from_string(lines[0])
nx, ny = utils.get_ints_from_string(lines[2*nvar + 1])
start = 2 * nvar + 2
sa = [utils.get_floats_from_string(lines[start + i]) for i in range(nx)]
self.sa = np.array(sa)