"""This module contains the classes for creating Profile objects. There are currently 3 profiles available through Eleos:
temperature (TemperatureProfile)
gases (GasProfile)
aerosols (AerosolProfile)"""
from collections import defaultdict
from pathlib import Path
import re
import numpy as np
from . import constants
from . import shapes
from . import utils
from . import parsers
[docs]
class Profile:
[docs]
def __init__(self, label=None):
"""This is the base class for all Profile objects. It should never be instantiated directly, use a subclass such as ``GasProfile`` or ``AerosolProfile``"""
self.retrieved = False
self.core = None
self.label = label
def __setattr__(self, name, value):
try:
if name in get_parameter_names(self.shape.VARIABLES, retrieved=True) + self.shape.CONSTANTS:
self.shape.__setattr__(name, value)
except:
pass
super().__setattr__(name, value)
def __getattr__(self, name):
try:
if name in get_parameter_names(super().__getattribute__("shape").VARIABLES, retrieved=True) + self.shape.CONSTANTS:
return self.shape.__getattribute__(name)
except:
pass
return super().__getattribute__(name)
def _clear_result(self):
"""Clear the retrieved values from the profile
Args:
None
Returns:
None"""
for name in self.shape.VARIABLES:
for attr in [f"retrieved_{name}", f"retrieved_{name}_error"]:
delattr(self.shape, attr)
self.retrieved = False
[docs]
@classmethod
def from_previous_retrieval(cls, result, label, use_retrieval_errors=False):
"""Create a Profile object using the retrieved parameters from a previous retrieval as priors. Use either id or label to specify the profile to use in the previous retrieval.
Args:
result (NemesisResult): The result object from the previous retrieval
label (str): The label of the profile to use in the previous retrieval"""
prev = None
for profile_label, profile in result.profiles.items():
if profile_label == label and cls == type(profile):
prev = profile
break
if prev is None:
raise ValueError(f"Profile with label {label} not found in previous retrieval")
new_profile = cls._create_profile_from_previous_retrieval(prev, use_retrieval_errors)
new_profile.shape._set_prior_to_retrieved(use_retrieval_errors)
new_profile.retrieved = False
new_profile.core = result.core
return new_profile
def _get_displayable_attributes(self, extra_consts=tuple(), extra_vars=tuple()):
"""Get the parameters that define the profile"""
groups = defaultdict(list)
for name in tuple(self.CONSTANTS) + tuple(extra_consts):
groups[name].append(name)
for name in tuple(self.VARIABLES) + tuple(extra_vars):
groups[name].append(name)
groups[name].append(name + "_error")
if self.retrieved:
groups[name].append("retrieved_" + name)
groups[name].append("retrieved_" + name + "_error")
return groups
[docs]
def print_table(self, colors=True, forward=False, **kwargs):
twocol = (not self.retrieved) or forward
if twocol:
headers = ["Prior", "Error"]
else:
headers = ["Prior", "Error", "Retrieved", "Error", "Change"]
names = self._get_displayable_attributes()
data = []
for base, params in names.items():
data.append([])
# Add prior (and retrieved) columns
for p in params:
if twocol and ("retrieved" in p):
continue
data[-1].append(getattr(self, p))
while len(data[-1]) < (2 if twocol else 4):
data[-1].append(" ")
# Add the difference column
if not twocol:
try:
pct_diff = 100*(getattr(self, params[2]) - getattr(self, params[0])) / getattr(self, params[0])
end = "\x1b[0m"
if colors and (abs(pct_diff) > 100):
color = "\x1b[41m"
elif colors and ( 20 < abs(pct_diff) <= 100):
color = "\x1b[43m"
elif colors and (abs(pct_diff) <= 20):
color = "\x1b[42m"
else:
color = ""
end = ""
data[-1].append(f"{color}{pct_diff:+.1f}%{end}")
except:
data[-1].append(" ")
table = utils.generate_ascii_table(f"{self.label} {self.shape}", headers, [n for n in names], data)
print(table, **kwargs)
return table
[docs]
def check_validity(self):
"""Check that the parameters for each Profile and Shape are valid. This should be overridden by subclasses"""
return self.shape.check_validity()
[docs]
class TemperatureProfile(Profile):
[docs]
def __init__(self, filepath, label=None):
"""Create a temperature profile from a prior file. NOT IMPLEMENTED YET
Args:
filepath: The filepath of the prior temperature profile
label (str): A label to associate with this Profile. By default it is "Temperature"
"""
raise NotImplementedError("Temperature profiles are not fully implemented yet. Catch this error at your peril")
super().__init__(label)
if self.label is None:
self.label = "Temperature"
self.shape = shapes.Shape0(filepath=filepath)
def __repr__(self):
return f"<TemperatureProfile [{self.create_nemesis_string()}]>"
def _get_displayable_attributes(self):
return []
def _add_result(self, df):
self.shape.data = df
self.retrieved = True
[docs]
def create_nemesis_string(self):
"""Create the NEMESIS code that represents the temperature profile. Temperature profiles currently only support mode 0 0 0 so this function's return value is always constant
Args:
None
Returns:
str: '0 0 0'"""
return f"0 0 0"
[docs]
def generate_apr_data(self):
"""Generate the section of the .apr file for this temperature profile
Args:
None
Returns:
str: The string to write to the .apr file"""
return "0 0 0 - Temp\n" + self.shape.generate_apr_data()
[docs]
def get_name(self):
if self.label is None:
return "Temperature"
else:
return self.label
[docs]
class GasProfile(Profile):
[docs]
def __init__(self,
gas_name=None,
gas_id=None,
isotope_id=0,
shape=None,
label=None):
"""Create a profile for a given gas (optionally an isotopologue) with a given shape. Call signatures:
Create profile with the name of the gas (ie 'NH3')
GasProfile(gas_name, isotope_id, shape)
Create profile with the ID of the gas (eg. 11)
GasProfile(gas_id, isotope_id, shape)
Args:
gas_name (str): The name of the gas (eg 'CH4'). Specify either this OR gas_id
gas_id (int): The radtrans ID of the gas (eg. 6). Specify either this OR gas_name
isotope_id (int): The ID of the isotopologue to use. Use 0 for a mix of all at terrestrial abundance
shape (Shape): A Shape object to use for the profile shape
label (str): A label to associate with this Profile. By default it is "<gas_name> <isotope_id>" (eg. "PH3 0")
"""
super().__init__(label=label)
self.isotope_id = isotope_id
if shape is None:
raise ValueError("shape attribute must be specified")
self.shape = shape
self.VARIABLES = list(self.shape.VARIABLES)
self.CONSTANTS = list(self.shape.CONSTANTS)
if not ((gas_id is None) ^ (gas_name is None)):
raise ValueError("Specifiy exactly one of gas_name or gas_id (not both!)")
if gas_name is None:
self.gas_id = gas_id
self.gas_name = constants.GASES.loc[constants.GASES.radtrans_id == gas_id].name.iloc[0]
elif gas_id is None:
self.gas_name = gas_name
self.gas_id = constants.GASES.loc[constants.GASES.name == gas_name].radtrans_id.iloc[0]
if label is None:
self.label = f"{self.gas_name} {self.isotope_id}"
def __repr__(self):
return f"<GasProfile {self.gas_name} [{self.create_nemesis_string()}]>"
def _create_profile_from_previous_retrieval(prev, use_retrieval_errors=False):
return GasProfile(gas_name=prev.gas_name,
isotope_id=prev.isotope_id,
shape=prev.shape,
label=prev.label)
def _add_result(self, df):
"""Take in a DataFrame created by reading in the .mre file (results.NemesisResult.read_mre) and assign the correct attributes
Args:
df: pandas.DataFrame with columns "prior", "prior_error", "retrieved", "retrieved_error" and a row for each parameter
Returns:
None"""
self.retrieved = True
# Get order of parameters
names = self.shape.VARIABLES
assert len(names) == len(df)
# Set attributes of the child Shape object
for name, (_, row) in zip(names, df.iterrows()):
for title, value in zip(row.index, row.values):
if "prior" in title:
attrname = title.replace("prior", name)
elif "retrieved" in title:
attrname = title.replace("retrieved", f"retrieved_{name}")
setattr(self.shape, attrname, value)
[docs]
def create_nemesis_string(self):
"""Create the NEMESIS code that represents the gas profile (eg. 23 0 1)
Args:
None
Returns:
str: The NEMESIS code"""
return f"{self.gas_id} {self.isotope_id} {self.shape.ID}"
[docs]
def generate_apr_data(self):
"""Generate the section of the .apr file for this gas profile
Args:
None
Returns:
str: The string to write to the .apr file"""
return self.create_nemesis_string() + " - " + self.gas_name + "\n" + self.shape.generate_apr_data()
[docs]
class AerosolProfile(Profile):
[docs]
def __init__(self,
shape,
radius,
variance,
n_lookup=None,
real_n=None,
imag_n=None,
retrieve_optical=False,
radius_error=None,
variance_error=None,
imag_n_error=None,
label=None):
"""Create a profile for an aerosol layer with a given shape and particle/optical properties.
Call signatures:
Constant refractive index over range (not retrieved):
AerosolProfile(shape, radius, variance, real_n, imag_n)
Constant refractive index over range (retrieved; ie with an attached 444):
AerosolProfile(shape, radius, radius_error, variance, variance_error, real_n, imag_n, imag_n_error)
Use refractive index from lookup table (not retrieved):
AerosolProfile(shape, radius, variance, n_lookup)
Use refractive index from lookup table (retrieved; ie with an attached 444):
AerosolProfile(shape, radius, radius_error, variance, variance_error, n_lookup)
Currently you cannot specify an arbitrary list of refractive index points to use, but this will be added!
Args:
shape (Shape): A Shape object to use for the profile shape
label (str): A label to associate with this Profile. By default it is "Aerosol <aerosol_id>" (eg. "Aerosol 1")
shape (Shape): A Shape object that describes the profile shape
radius (float): Particle radius in microns
variance (float): Variance of the particle size distribution in microns (gamma distribution)
n_lookup (str): If given, use the refractive indicies of this gas (eg. 'CH4')
real_n (float): If n_lookup is not given, then use this as the real part of the refractive index
imag_n (float): If n_lookup is not given, then use this as the imaginary part of the refractive index,
retrieve_optical (bool): Whether to retrieve the optical porperties of the aerosol (this adds a 444 profile in NEMESIS)
radius_error (float): If retrieve_optical is set, the error used for the particle radius prior
variance_error (float): If retrieve_optical is set, the error used for the particle size variance prior
imag_n_error (float): If retrieve_optical is set, the error used for the imaginary part of the refractive index
label (str): A label to assosiate with this profile
"""
super().__init__(label=label)
if label is None:
self.label = f"Aerosol {self.aerosol_id}"
# Assign basic parameters
self.aerosol_id = "UNASSIGNED"
self.shape = shape
self.retrieve_optical = retrieve_optical
# Set particle properties
self.radius = radius
self.variance = variance
# Set either n_lookup or real_n and imag_n
if n_lookup is None:
self.real_n = real_n
self.imag_n = imag_n
self.lookup = False
else:
self.n_lookup = n_lookup
self.lookup = True
# Set the prior errors if retrieving
if retrieve_optical:
self.CONSTANTS = self.shape.CONSTANTS + ["real_n"]
self.VARIABLES = self.shape.VARIABLES + ["radius", "variance", "imag_n"]
self.radius_error = radius_error
self.variance_error = variance_error
self.imag_n_error = imag_n_error
if radius_error is None or variance_error is None or imag_n_error is None:
raise ValueError("Cannot retrieve optical properties without specified errors. Did you remember to set radius_error, variance_error, or imag_n_error?")
else:
self.CONSTANTS = self.shape.CONSTANTS + ["radius", "variance", "real_n", "imag_n"]
self.VARIABLES = self.shape.VARIABLES
if radius_error is not None or variance_error is not None or imag_n_error is not None:
raise ValueError("Cannot specify errors for radius/variance.imag_n without retrieving optical peroperties. Did you forget to set retrieve_optical=True?")
def __repr__(self):
return f"<AerosolProfile {self.label} [{self.create_nemesis_string()}]>"
def _add_result(self, df, df_444=None):
"""Take in a DataFrame created by reading in the .mre file (results.NemesisResult.read_mre) and assign the correct attributes
Args:
df: pandas.DataFrame with columns "prior", "prior_error", "retrieved", "retrieved_error" and a row for each parameter
df_444: Same as df, but this is for the 444 profile if retrieving optical properties
Returns:
None"""
# Get order of parameters
names = self.shape.VARIABLES
assert len(names) == len(df)
# Set attributes of the child Shape object
for name, (_, row) in zip(names, df.iterrows()):
for title, value in zip(row.index, row.values):
if "prior" in title:
attrname = title.replace("prior", name)
elif "retrieved" in title:
attrname = title.replace("retrieved", f"retrieved_{name}")
setattr(self.shape, attrname, value)
if df_444 is not None:
for name, (_, row) in zip(["radius", "variance", "imag_n"], df_444.iterrows()):
for title, value in zip(row.index, row.values):
if "prior" in title:
attrname = title.replace("prior", name)
elif "retrieved" in title:
attrname = title.replace("retrieved", f"retrieved_{name}")
setattr(self, attrname, value)
self.retrieved = True
def _create_profile_from_previous_retrieval(prev, use_retrieval_errors=False):
if prev.retrieve_optical:
if prev.lookup:
prof = AerosolProfile(shape= prev.shape,
radius= prev.retrieved_radius,
radius_error= prev.retrieved_radius_error if use_retrieval_errors else prev.radius_error,
variance= prev.retrieved_variance,
variance_error= prev.retrieved_variance_error if use_retrieval_errors else prev.variance_error,
n_lookup= prev.n_lookup,
label= prev.label,
retrieve_optical=True)
else:
prof = AerosolProfile(shape= prev.shape,
radius= prev.retrieved_radius,
radius_error= prev.retrieved_radius_error if use_retrieval_errors else prev.radius_error,
variance= prev.retrieved_variance,
variance_error= prev.retrieved_variance_error if use_retrieval_errors else prev.variance_error,
real_n= prev.real_n,
imag_n= prev.retrieved_imag_n,
imag_n_error= prev.retrieved_imag_n_error if use_retrieval_errors else prev.imag_n_error,
label= prev.label,
retrieve_optical=True)
else:
if prev.lookup:
prof = AerosolProfile(shape= prev.shape,
radius= prev.radius,
variance= prev.variance,
n_lookup= prev.n_lookup,
label= prev.label,
retrieve_optical=False)
else:
prof = AerosolProfile(shape= prev.shape,
radius= prev.radius,
variance= prev.variance,
real_n= prev.real_n,
imag_n= prev.imag_n,
label= prev.label,
retrieve_optical=False)
prof.aerosol_id = prev.aerosol_id
return prof
[docs]
def create_nemesis_string(self):
"""Create the NEMESIS code that represents the aerosol profile (eg. -1 0 32)
Args:
None
Returns:
str: The NEMESIS code"""
return f"-{self.aerosol_id} 0 {self.shape.ID}"
[docs]
def generate_apr_data(self):
"""Generate the section of the .apr file for this aerosol profile
Args:
None
Returns:
str: The string to write to the .apr file"""
aerosol_part = self.create_nemesis_string() + f" - {self.label}\n" + self.shape.generate_apr_data()
if self.retrieve_optical:
imagn_part = f"\n444 {self.aerosol_id} 444 - {self.label}\ncloudf{self.aerosol_id}.dat"
return aerosol_part + imagn_part
else:
return aerosol_part
def _generate_cloudfn_dat(self, directory):
directory = Path(directory)
# Get the wavelengths in the xsc file
xsc_parser = parsers.NemesisXsc(directory / "nemesis.xsc")
nwave = len(xsc_parser.xsc.wavelength)
refwave = self.core.reference_wavelength
# Get the refractive indicies in the Makephase output or from the given attributes
if self.lookup:
refindexes = parsers.MakephaseOut(directory / "makephase.out").data
imag_ns = refindexes[self.label + " imag"]
wi,_ = utils.find_nearest(refindexes.wavelength, refwave)
ref_real_n = refindexes[self.label + " imag"].iloc[wi]
else:
imag_ns = [self.imag_n for x in range(nwave)]
ref_real_n = self.real_n
# Generate cloudfN.dat
with open(directory / f"cloudf{self.aerosol_id}.dat", mode="w+") as file:
utils.write_nums(file, self.radius, self.radius_error)
utils.write_nums(file, self.variance, self.variance_error)
file.write(f"{nwave} {0 if self.lookup else -1}\n")
utils.write_nums(file, refwave, ref_real_n)
utils.write_nums(file, refwave)
for imag_n, wavelength in zip(imag_ns, xsc_parser.xsc.wavelength):
utils.write_nums(file, wavelength, imag_n, self.imag_n_error)
[docs]
def check_validity(self):
conditions = [
self.radius > 0,
self.variance > 0,
self.real_n >= 1.0,
self.imag_n >= 0.0,
self.shape.check_validity()
]
return all(conditions)
[docs]
def get_parameter_names(base, retrieved=True):
out = []
for b in base:
out.append(b)
out.append(b+"_error")
if retrieved:
out.append("retrieved_"+b)
out.append("retrieved_"+b+"_error")
return out