import math
import pickle
import time
import sys
import signal
import os
import shutil
import struct
import re
import copy
import subprocess
import functools
from pathlib import Path, PurePath
import pandas as pd
import numpy as np
import warnings
warnings.formatwarning = lambda msg, *_: f"Warning: {msg}\n"
from . import constants
from . import parsers
from . import utils
from . import profiles as profiles_ # to avoid namespace shadowing by NemesisCore.profiles
from . import results
# if modifying this yourself: increment, then assign!
CORE_ID_COUNTER = 0
[docs]
class NemesisCore:
"""The class that constructs a core directory and all the required files for NEMESIS to run.
Attributes:
parent_directory (Path): The directory in which the core directory is created
directory (Path): The directory in which the core files are stored
id_ (int): The unique ID of the core
profiles (dict): A dictionary of profiles.Profile objects to retrieve, where the keys are the profile labels and the values are the Profile objects themselves
spectrum (parsers.NemesisSpx): The spectrum object to fit
spectrum_filepath (Path): The filepath to the spectrum .spx file
cia_file (Path): The path to the collision-induced absorbtion file to use (default: mgRT_mgRV_40-400K_dnu4.0.tab, warning: always assumes d nu = 4.0)
sol_file (Path): The path to the solar spectrum file to use (default: solar_spec.dat)
ref (parsers.NemesisRef): The parsed ref file
planet (str): The name of the planet being observed
scattering (bool): Whether to run a scattering retrieval or not
num_aerosol_modes (int): The number of aerosol profiles in the retrieval
forward (bool): Whether of not to run a forward model (ie. set number of iterations = 0)
num_iterations (int): Number of iterations to run in the retrieval (if forward is set this has no effect)
num_layers (int): The number of atmospheric layers to simulate
layer_type (int): The type of layering to use
min_pressure (float): The minimum pressure in the atmosphere
max_pressure (float): The maximum pressure in the atmosphere
bottom_layer_height (int): The height in km of the bottom of the atmosphere
fixed_peaks (list): A list of FixedPeak objects that specify regions of the spectrum to fix
instrument_ktables (str): Either 'NIRSPEC' or 'MIRI'; determines which set of ktables to use.
fmerror_factor (float): The factor by which to multiply the error on the spectrum (see also, fmerror_pct and fmerror_value)
fmerror_pct (float): If given, instead of using fmerror_factor or fmerror_value, use a flat percentage of the brightness (eg. 0.1 = 10%) (see also, fmerror_factor and fmerror_value)
fmerror_value (float): If given, instead of using fmerror_factor or fmerror_pct, use a flat value in W/cm2/sr/um (see also, fmerror_factor and fmerror_pct)
cloud_cover (bool): If scattering mode is on, then this is the fractional cloud cover between 0 and 1 (usually doesn't need to be changed)
reference_wavelength (float): If scattering mode is on, then normalise the cross-sections at the closest wavelength to this value in the .xsc file
"""
[docs]
def __init__(self,
parent_directory,
spectrum,
reference=None,
aerosols=None,
cia_file=None,
sol_file=None,
profiles=list(),
planet="jupiter",
scattering=True,
forward=False,
num_iterations=20,
num_layers=39,
min_pressure=None,
max_pressure=None,
instrument_ktables="NIRSPEC",
use_gases=list(),
sparsify_ktables=False,
fmerror_factor=None,
fmerror_pct=None,
fmerror_value=None,
cloud_cover=1.0,
reference_wavelength=None):
"""Create a NEMESIS core directory with a given set of profiles to retrieve
Args:
parent_directory (str): The directory in which to create the core folder
spectrum (str or NemesisSpx): Either the path to the .spx file to fit or a parsers.NemesisSpx object
reference (str or NemesisRef): Either the path to the nemesis.ref file to use or a parsers.NemesisRef object
aerosols (str or AerosolRef): Either the path to the aerosol.ref file to use or a parsers.AerosolRef object
cia_file (str): Path to the collision-indced absorbtion file to use
sol_file (str): Path to the solar spectrum file to use
profiles List[str]: List of profiles.Profile objects to retrieve
planet (str): Name of the planet being observed. Must be one of 'jupiter', 'saturn', 'uranus', 'neptune' or 'titan'.
scattering (bool): Whether to run a scattering retrieval or not
forward (bool): Whether of not to run a forward model (ie. set number of iterations = 0)
num_iterations (int): Number of iterations to run in the retrieval (if forward is set this has no effect)
num_layers (int): The number of atmospheric layers to simulate - for scattering runs the default maximum is 39.
min_pressure (float): The pressure at the top of the model atmosphere
max_pressure (float): The pressure at the bottom of the model atmosphere
instrument_ktables (str): Either 'NIRSPEC' or 'MIRI'; determines which set of ktables to use.
use_gases (list): A list of the gases to use in the model, with names corresponding the to ktable names (eg. "ch4.combi.kta" would be ["ch4"])
sparsify_ktables (bool): Whether to reduce the number of wavelengths in the ktables to only those that are required (may significantly speed up computation)
fmerror_factor (float): The factor by which to multiply the error on the spectrum (see also, fmerror_pct and fmerror_value)
fmerror_pct (float): If given, instead of using fmerror_factor or fmerror_value, use a flat percentage of the brightness (eg. 0.1 = 10%) (see also, fmerror_factor and fmerror_value)
fmerror_value (float): If given, instead of using fmerror_factor or fmerror_pct, use a flat value in W/cm2/sr/um (see also, fmerror_factor and fmerror_pct)
cloud_cover (bool): If scattering mode is on, then this is the fractional cloud cover between 0 and 1 (usually doesn't need to be changed)
reference_wavelength (float): If scattering mode is on, then normalise the cross-sections at the closest wavelength to this value in the .xsc file. This parameter will be updated with the exact wavelength
"""
# Increment the global core counter and store local version
global CORE_ID_COUNTER
CORE_ID_COUNTER += 1
self.id_ = CORE_ID_COUNTER
# Assign attributes passed in
self.planet = planet.lower()
self.scattering = scattering
self._forward = forward
self.num_iterations = num_iterations
self.num_layers = num_layers
self.fmerror_factor = fmerror_factor
self.fmerror_pct = fmerror_pct
self.fmerror_value = fmerror_value
self.cloud_cover = cloud_cover
self.reference_wavelength = reference_wavelength
self.sparsify_ktables = sparsify_ktables
self.use_gases = use_gases
# Parse spx argument
self.change_spectrum(spectrum)
# Parse ref file
if reference is None:
reference = constants.PATH / f"data/{planet}/{planet}.ref"
warnings.warn(f"No ref file specified. Using the default at {reference}")
self.change_reference(reference)
# Set up instrument ktables
if instrument_ktables.lower() not in ["nirspec", "miri"]:
raise ValueError(f"instrument_ktables must be either 'NIRSPEC' or 'MIRI', not {instrument_ktables}")
self.instrument_ktables = instrument_ktables.lower()
KTABLE_PATH = Path("/data/nemesis/specdata/ktables")
if self.instrument_ktables == "nirspec":
self.ktable_path = KTABLE_PATH / "jwst_nirspec_2024"
elif self.instrument_ktables == "miri":
self.ktable_path = KTABLE_PATH / "miri/mrs2024_combi"
# Set the directories of the parent folder and own core
self.parent_directory = Path(parent_directory).resolve()
self.directory = self.parent_directory / f"core_{self.id_}"
# Check if we are perfoming a scattering run with more than 39 layers (NEMESIS hates this...)
if num_layers > 39 and self.scattering:
warnings.warn(f"Too many atmospheric layers specified for a scattering run ({num_layers} vs. 39). Automatically reducing to 39")
self.num_layers = 39
# Set cia file if not set
if cia_file is not None:
# if the file can be found from here then it is not in raddata, so add it
if Path(cia_file).exists():
add_to_raddata(cia_file)
# If it isnt in raddata either, raise an error
elif not (constants.NEMESIS_PATH / "radtrancode/raddata" / Path(cia_file)).exists():
raise FileNotFoundError(f"Could not find {cia_file} here or in raddata/")
self.cia_file = Path(cia_file).name
else:
self.cia_file = Path("mgRT_mgRV_40-400K_dnu4.0.tab")
# Set sol file if not set
if sol_file is not None:
sol_file = Path(sol_file)
# if the file can be found from here then it is not in raddata, so add it
if sol_file.exists():
add_to_raddata(Path(sol_file))
# If it isnt in raddata either, raise an error
elif not (constants.NEMESIS_PATH / "radtrancode/raddata" / sol_file).exists():
raise FileNotFoundError(f"Could not find {sol_file} here or in raddata/")
self.sol_file = sol_file
else:
self.sol_file = Path("solar_spec.dat")
# Set min/max pressures
self.set_pressure_limits(max_pressure=max_pressure,
min_pressure=min_pressure)
# Set number of aerosol modes (incremented by add_profile)
self.num_aerosol_modes = 0
# Add a list to hold any FixedPeak classes the user wants
self.fixed_peaks = []
# Add a reference to self in each Profile and set up AerosolProfiles correctly
self.profiles = dict()
for profile in profiles:
self.add_profile(profile)
# If in forward mode, set the number of iterations to 0
if self.forward:
self.num_iterations = 0
# Parse aerosol.ref file
self.change_aerosols(aerosols)
# Raise an error if trying to use features not implemented yet
if planet != "jupiter":
raise Exception("Eleos does not support planets other than Jupiter yet! ")
def __str__(self):
return f"<NemesisCore: {self.directory}>"
@property
def forward(self):
return self._forward
@forward.setter
def forward(self, value):
self._forward = value
if value:
self.num_iterations = 0
else:
self.num_iterations = 20
def _create_directory_tree(self, confirm):
"""
Create the required directory structure for the core to run in. It generates:
+parent_direcotry
+----ktables
+---+core_{id_}
+----plots
If the core directory already exists it will check the contents and inform the user, asking them to confirm is confirm=True.
Args:
confirm (bool): Whether to confirm with the user before overwriting an existing core directory
Returns:
None
"""
os.makedirs(self.parent_directory, exist_ok=True)
os.makedirs(self.parent_directory / "ktables", exist_ok=True)
if os.path.exists(self.directory) and confirm:
if os.path.exists(self.directory / "nemesis.mre"):
msg = "There is already a core that has been run in"
elif os.path.exists(self.directory / "nemesis.ref"):
msg = "There is already a core that has not been run yet in"
else:
msg = "There is already data in"
x = input(f"{msg} {self.directory.resolve()} - erase and continue? Y/N ")
if x.lower() == "n":
return
shutil.rmtree(self.directory)
os.makedirs(self.directory / "plots", exist_ok=True)
def _save_core(self):
"""Dump the core object to a pickle file in the core directory
Args:
None
Returns:
None
Creates:
core.pkl"""
with open(self.directory / "core.pkl", mode="wb") as file:
pickle.dump(self, file)
def _copy_generation_script(self):
"""Copy the script used to generate the core into the core
Args:
None
Returns:
None
Creates:
generation.py"""
shutil.copy(sys.argv[0], self.directory / "generation.py")
def _copy_template_files(self):
"""Copy some boilerplate files from the library data directory into the core directory
Args:
None
Returns:
None
Creates:
nemesis.abo
nemesis.nam"""
shutil.copy(constants.PATH / "data/statics/nemesis.abo", self.directory)
shutil.copy(constants.PATH / "data/statics/nemesis.nam", self.directory)
def _generate_ref(self):
self.reference.write(self.directory / "nemesis.ref")
def _generate_spx(self):
self.spectrum.write(self.directory / "nemesis.spx")
def _generate_cia(self):
"""Generate the .cia (collision induced absorbtion) file
Args:
None
Returns:
None
Creates:
nemesis.cia
"""
with open(self.directory / "nemesis.cia", mode="w+") as file:
file.write(self.cia_file.name + "\n")
file.write("4.00000\n")
file.write("24\n")
def _generate_sol(self):
"""Generate the .sol (solar spectrum) file
Args:
None
Returns:
None
Creates:
nemesis.cia
"""
with open(self.directory / "nemesis.sol", mode="w+") as file:
file.write(self.sol_file.name)
def _generate_inp(self):
"""Generate the nemesis input file
Args:
num_iterations: Number of iterations to run (has no effect if NemesisCore.forward is True)
Returns:
None
Creates:
nemesis.inp"""
with open(constants.PATH / "data/statics/nemesis.inp") as file:
out = file.read()
out = out.replace("<SCATTERING>", str(int(self.scattering)))
out = out.replace("<N_ITERATIONS>", str(self.num_iterations))
with open(self.directory / "nemesis.inp", mode="w+") as file:
file.write(out)
def _generate_apr(self):
"""Generate the nemesis.apr file from the profile list
Args:
None
Returns:
None
Creates:
nemesis.apr"""
out = f"*******Apriori File*******\n <NUM_PROFILES>\n"
num_profiles = 0
for profile in self.profiles.values():
profile.shape.create_required_files(self.directory)
if isinstance(profile, profiles_.AerosolProfile) and profile.retrieve_optical:
profile._generate_cloudfn_dat(self.directory)
num_profiles += 1
out += profile.generate_apr_data() + "\n"
num_profiles += 1
with open(self.directory / "nemesis.apr", mode="w") as file:
out = out.replace("<NUM_PROFILES>", str(num_profiles))
file.write(out)
def _generate_set(self):
"""Generate the settings file for NEMESIS
Args:
None
Returns:
None
Creates:
nemesis.set"""
with open(constants.PATH / "data/statics/nemesis.set", mode="r") as file:
out = file.read()
out = out.replace("<DISTANCE>", f"{constants.DISTANCE[self.planet]:.3f}")
out = out.replace("<SUNLIGHT>", f"{int(self.scattering)}")
out = out.replace("<BOUNDARY>", f"{int(self.scattering)}")
out = out.replace("<N_LAYERS>", f"{int(self.num_layers)}")
out = out.replace("<BASE>", f"{self.bottom_layer_height:.2f}")
with open(self.directory / "nemesis.set", mode="w+") as file:
file.write(out)
def _generate_flags(self, inormal=0, iray=1, ih2o=0, ich4=0, io3=0, inh3=0, iptf=0, imie=0, iuv=0):
"""Generate the flags file. As a general rule, leave all this at the defaults. The descriptions are copied directly fron the NEMESIS manual.
Args:
inormal: 0 or 1 depending on whether the ortho/para-H2 ratio is in equilibrium (0) or normal 3:1 (1).
iray: Sets the Rayleigh optical depth calculation:
0 = Rayleigh scattering optical depth is not included.
1 = Rayleigh optical depths suitable for gas giant atmosphere
2 = Rayleigh optical depths suitable for CO2 dominated atmosphere
3 = Rayleigh optical depths suitable for a N2-O2 atmosphere.
4 = New Rayleigh optical depths suitable for a gas giant atmosphere.
5 = New Rayleigh optical depths suitable for a gas giant atmosphere PLUS Raman scattering.
6 = New Rayleigh optical depths suitable for a gas giant atmosphere PLUS Sromovsky Raman scattering, PLUS Sromovsky Polarisation Correction.
ih2o: Turns additional H2O continuum off (0) or on (1)
ich4: Turns additional CH4 continuum off (0) or on (1)
io3: Turns additional O3 UV continuum off (0) or on (1)
inh3: Turns additional NH3 continuum off (0) or on (1)
iptf: Used in only a few routines to switch between normal partition function calculation (0) or the high-temperature partition function for CH4 for Hot Jupiters.
imie: Only relevant for scattering calculations. If set to 0, the phase function is computed from the associated Henyey-Greenstein hgphase*.dat files. However, if set to 1, the phase function is computed from the MieTheory calculated PHASEN.DAT file.
Returns:
None
Creates:
nemesis.fla"""
with open(self.directory / "nemesis.fla", mode="w+") as file:
file.write(' {aa} ! Inormal (0=eqm, 1=normal)\n {bb} ! Iray (0=off, 1=on)\n {cc} ! IH2O (1 = turn extra continuum on)\n {dd} ! ICH4 (1 = turn extra continuum on)\n {ee} ! IO3 (1 = turn extra continuum on)\n {ff} ! INH3 (1 = turn extra continuum on)\n {gg} ! Iptf (0=default, 1=CH4 High-T)\n {hh} ! IMie\n {ii} ! UV Cross-sections\n 0 ! INLTE (0=LTE)'.format(aa=inormal, bb=iray, cc=ih2o, dd=ich4, ee=io3, ff=inh3, gg=iptf, hh=imie, ii=iuv))
def _generate_aerosol_ref(self):
"""Generate the aerosol reference profile.
Args:
None
Returns:
None
Creates:
aerosol.ref"""
# check that the number of aerosol modes matches and fix if not
# (if the user adds an aerosol profile after instanstiation then it may not)
if self.aerosols.num_modes != self.num_aerosol_modes:
warnings.warn("Number of aerosol modes defined in aerosol.ref and the profiles are not the same! Eleos will set the aerosol.ref file to all zeros - if this is not what you inteded then modify it manually!")
self.change_aerosols(None)
self.aerosols.write(self.directory / "aerosol.ref")
def _generate_kls(self):
"""
Copy the ktables from the input directory for the given instrument to the parent directory, optionally sparsifying them.
Generate the .kls file specifying them.
Args:
None
Returns:
None
Creates:
nemesis.kls
"""
to_include = []
ktables = list(self.ktable_path.glob("*.kta"))
# Loop over each specified gas
for gas in self.use_gases:
# Find the correct ktable
for ktable in ktables:
name = re.search(r'([^/]+)(?=\.combi\.kta)', ktable.name).group(1)
if gas == name:
break
else:
available_names = '\n'.join(self.get_ktable_gas_names())
raise ValueError(f"'{gas}' not found in available ktables. Available ktables are:\n{available_names}\n")
# When the loop finishes, ktable will be set to the correct file
to_include.append(ktable.name)
# Whether to sparsify the ktables
if self.sparsify_ktables:
_sparsify_single_ktable(tuple(self.get_spx_wavelength_grid()), ktable, self.parent_directory / "ktables" / ktable.name)
else:
shutil.copy(ktable, self.parent_directory / "ktables")
# Create the kls file
with open(self.directory / "nemesis.kls", mode="w+") as file:
for ktable in to_include:
file.write(f"../ktables/{ktable}\n")
def _generate_fmerror(self):
"""For each wavelength in the spx file, adjust the error by a factor (either fmerror_factor, _pct or _value) and write to the fmerror file.
Currently only supports 1 spx geometry
Args:
None
Returns:
None
Creates:
fmerror.dat """
num_entries = len(self.get_spx_wavelength_grid())
x = (self.fmerror_factor is not None, self.fmerror_pct is not None, self.fmerror_value is not None)
if sum(x) > 1:
raise ValueError("Must specify exactly one of fmerror_factor, fmerror_pct or fmerror_value")
if num_entries > 2048:
warnings.warn(f"spx file has too many wavelengths for default NEMESIS installation ({num_entries}/2048) - have you increased IDIM in arrdef.f?")
with open(self.directory / "fmerror.dat", mode="w+") as file:
# Header with number of lines
file.write(f"{num_entries+2}\n")
# Catch any cases outside the wavelength range (lower)
file.write(f"{0.1:.6e} {0:.6e}\n")
# Scale the spx error by either fmerror_factor, fmerror_pct or fmerror_value
for wl, err, spc in zip(self.spectrum.wavelengths, self.spectrum.error, self.spectrum.spectrum):
# Check to see if this peak is fixed
flag = False
for fxp in self.fixed_peaks:
if fxp.isin(wl):
file.write(f"{wl:.6e} {fxp.error:.6e}\n")
flag = True
# Otherwise, compute the new error
if self.fmerror_factor is not None and not flag:
file.write(f"{wl:.6e} {err*self.fmerror_factor:.6e}\n")
if self.fmerror_pct is not None and not flag:
file.write(f"{wl:.6e} {spc*self.fmerror_pct:.6e}\n")
if self.fmerror_value is not None and not flag:
file.write(f"{wl:.6e} {self.fmerror_value:.6e}\n")
# Catch any cases outside the wavelength range (upper)
file.write(f"{100:.6e} {0:.6e}\n")
def _generate_xsc(self):
"""Run Makephase and Normxsc with the given inputs to generate the .xsc and phase files.
Args:
None
Returns:
None
Creates:
nemesis.xsc
PHASE{N}.DAT
hgphase{n}.dat"""
# # Check to see if any aerosols have been added - if not then return immediately
# if self.num_aerosol_modes == 0:
# return
# Run Makephase and Normxsc
cwd = os.getcwd()
os.chdir(self.directory)
os.system("Makephase < makephase.inp > makephase.out")
os.system("Normxsc < normxsc.inp > normxsc.out")
os.chdir(cwd)
def _generate_fcloud_ref(self):
"""Generate the fcloud.ref file
Args:
None
Returns:
None
Creates:
fcloud.ref"""
# Return immediately if there are no cloud layers defined
if self.num_aerosol_modes == 0:
return
with open(self.directory / "fcloud.ref", mode="w+") as file:
file.write(f"{len(self.reference.data)} {self.num_aerosol_modes}\n")
for height in self.reference.data.height:
file.write(f" {height:> 9.4f} {int(self.cloud_cover)}")
for n in range(self.num_aerosol_modes):
file.write(" 1")
file.write("\n")
def _generate_parah2_ref(self):
"""Generate the parah2.ref file from a profile in the eleos/data/[planet] directory, with any
pressure limits imposed by NemesisCore.set_pressure_limits()
Args:
None
Returns:
None
Creates:
parah2.ref"""
df = pd.read_table(constants.PATH / f"data/{self.planet}/parah2.ref", skiprows=1, header=None, sep=r"\s+")
df.columns = ["height", "parah2"]
minh, maxh = self.get_height_limits()
df = df[(df.height >= minh) & (df.height <= maxh)]
with open(self.directory / "parah2.ref", mode="w+") as file:
file.write(str(len(df)) + "\n")
file.write(df.to_string(header=False, index=False))
def _generate_makephase_inp(self):
"""Add the optical properties of any attached AerosolProfiles to the Makephase input file
Args:
None
Returns:
None
Creates:
makephase.inp
normxsc.inp"""
# Get wavelength bounds including the specified reference wavelength
start, end, step, idx = self._get_xsc_wavelengths(step=0.1)
# Generate the makephase.inp file
with open(self.directory / "makephase.inp", mode="w+") as file:
file.write(str(self.num_aerosol_modes) + "\n")
file.write("1\n")
utils.write_nums(file, start, end, step)
file.write("nemesis\n")
file.write("y\n")
for name, profile in self.profiles.items():
if isinstance(profile, profiles_.AerosolProfile):
file.write(f"1\n{profile.radius} {profile.variance}\n2\n")
if not profile.lookup:
file.write(f"1\n{profile.real_n} {profile.imag_n}\n")
else:
file.write(f"{constants.MAKEPHASE_GASES[profile.n_lookup]}\n")
# Generate the normxsc.inp file
with open(self.directory / "normxsc.inp", mode="w+") as file:
file.write(f"nemesis.xsc\n{idx + 1} 1")
def _generate_pressure_lay(self):
"""Generate the pressure.lay file that splits the .ref file up into layers for the model
Args:
min_pressure (float): The pressure at the top of the atmosphere
max_pressure (float): The pressure at the bottom of the atmosphere
Returns:
None
Creates:
pressure.lay"""
pressures = np.logspace(np.log10(self.max_pressure),
np.log10(self.min_pressure),
self.num_layers)
with open(self.directory / "pressure.lay", mode="w+") as file:
file.write("Created by Eleos\n")
file.write(str(self.num_layers) + "\n")
for p in pressures:
file.write(str(p) + "\n")
def _generate_summary(self):
"""Dump the information used to create the NEMESIS core to a human-readable text file
Args:
None
Returns:
None
Creates:
summary.txt
aerosol_names.txt"""
with open(self.directory / "summary.txt", mode="w+") as file:
file.write(f"Generated: {time.asctime()}\n")
out = ""
for k, v in self.__dict__.items():
if k == "profiles":
out += "profiles:\n"
for label, profile in v.items():
out += utils.indent(label, level=1) + "\n"
for p in profile.__dict__:
if p in ("core", "retrieved"):
continue
out += utils.indent(f"{p}: {getattr(profile, p)}", level=2) + "\n"
if p == "shape":
for pp in profile.shape.__dict__:
out += utils.indent(f"{pp}: {getattr(profile.shape, pp)}", level=3) + "\n"
else:
out += f"{k}: {v}"
out += "\n"
file.write(out)
with open(self.directory / "aerosol_names.txt", mode="w+") as file:
if self.num_aerosol_modes == 0:
return
for name, profile in self.profiles.items():
if isinstance(profile, profiles_.AerosolProfile):
file.write(name + "\n")
file.truncate(file.tell() - 1)
def _get_xsc_wavelengths(self, step):
"""Get the wavelengths for use in Makephase, including the full data range and, if necessary,
expanding to include the reference wavelength specified.
Args:
step: Wavelength step size (microns)
Returns:
float: Start wavelength
float: End wavelength
float: Wavelength step
int: reference wavelength index
"""
# Get wavelengths from spx file
wls = self.get_spx_wavelength_grid()
# Initial guesses for start and end wavelengths
start_wl = math.floor(min(wls) / step) * step
end_wl = math.ceil(max(wls) / step) * step
# Expand the range to include the reference wavelength if necessary
if self.reference_wavelength < start_wl:
start_wl = math.floor(self.reference_wavelength / step) * step # Round down to nearest step
elif self.reference_wavelength > end_wl:
end_wl = math.ceil(self.reference_wavelength / step) * step # Round up to nearest step
# Find the reference wavelength index and set the reference value to the closest value
idx, ref = utils.find_nearest(np.arange(start_wl, end_wl+step, step), self.reference_wavelength)
self.reference_wavelength = ref
return start_wl, end_wl, step, idx
def _check_profile_validity(self):
"""Check that the parameters for each Profile and Shape are valid"""
for label, profile in self.profiles.items():
profile.check_validity()
def _reset_aerosol_numbering(self):
"""Reset the aerosol numbering. Useful if loading multiple AerosolProfiles from different cores
Args:
None
Returns:
None"""
self.num_aerosol_modes = 0
for label, profile in self.profiles.items():
if isinstance(profile, profiles_.AerosolProfile):
self.num_aerosol_modes += 1
profile.aerosol_id = self.num_aerosol_modes
[docs]
def generate_core(self, verbosity=1, confirm=True):
"""Create all the files necessary for a NEMESIS retrieval in the directory specified by NemesisCore.directory.
Args:
verbosity (int): One of 0, 1, 2. If 0 then do not print any progress. If 1 then print core number. If 2 then print status for each file
Returns:
None
Creates:
See _generate_* methods
"""
if verbosity == 1: print(f"Generating core {self.id_}")
if verbosity == 2: print("Creating directory structure")
self._create_directory_tree(confirm)
if verbosity == 2: print(f"Generating summary file")
self._generate_summary()
if verbosity == 2: print(f"Copying generation script")
self._copy_generation_script()
if verbosity == 2: print(f"Generating nemesis.ref")
self._generate_ref()
if verbosity == 2: print(f"Generating nemesis.spx")
self._generate_spx()
if verbosity == 2: print(f"Copying boilerplate files")
self._copy_template_files()
if verbosity == 2: print(f"Generating pressure.lay")
self._generate_pressure_lay()
if verbosity == 2: print(f"Generating nemesis.sol")
self._generate_sol()
if verbosity == 2: print(f"Generating nemesis.cia")
self._generate_cia()
if verbosity == 2: print(f"Generating nemesis.inp")
self._generate_inp()
if verbosity == 2: print(f"Generating nemesis.set")
self._generate_set()
if verbosity == 2: print(f"Generating nemesis.fla")
self._generate_flags()
if verbosity == 2: print(f"Generating aerosol.ref")
self._generate_aerosol_ref()
if verbosity == 2: print(f"Generating parah2.ref")
self._generate_parah2_ref()
if verbosity == 2: print(f"Generating nemesis.kls")
self._generate_kls()
if verbosity == 2: print(f"Generating fmerror.dat")
self._generate_fmerror()
if verbosity == 2: print(f"Generating fcloud.ref")
self._generate_fcloud_ref()
if verbosity == 2: print(f"Generating makephase.inp and normxsc.inp")
self._generate_makephase_inp()
if verbosity == 2: print(f"Generating nemesis.xsc and [hg]phase.dat files")
self._generate_xsc()
if verbosity == 2: print(f"Generating nemesis.apr")
self._generate_apr()
if verbosity == 2: print("Saving Eleos core object")
self._save_core()
[docs]
def change_spectrum(self, new_spectrum):
"""Replace the existing .spx file of the core with a new
Args:
new_spectrum (parsers.NemesisSpx or pathlike): The new spx file to use as either a path to a .spx file, or a parsers.NemesisSpx object
Returns:
None
"""
if isinstance(new_spectrum, parsers.NemesisSpx):
self.spectrum_filepath = None
self.spectrum = new_spectrum
# otherwise assume pathlike is specified
else:
self.spectrum_filepath = Path(new_spectrum).resolve()
self.spectrum = parsers.NemesisSpx(self.spectrum_filepath)
[docs]
def change_reference(self, new_reference):
"""Replace the existing .ref file of the core with a new
Args:
new_reference (parsers.NemesisRef or pathlike): The new ref file to use as either a path to a .ref file, or a parsers.NemesisRef object.
If a prf file is specified then it will be used as the new ref file.
Returns:
None
"""
if isinstance(new_reference, (parsers.NemesisRef)):
self.reference_filepath = None
self.reference = new_reference
# otherwise assume pathlike is specified
else:
self.reference_filepath = Path(new_reference).resolve()
self.reference = parsers.NemesisRef(self.reference_filepath)
[docs]
def change_aerosols(self, new_aerosols):
"""Replace the existing aerosol.ref file of the core with a new
Args:
new_aerosols (parsers.AerosolRef or pathlike): The new ref file to use as either a path to a .ref file, or a parsers.AerosolRef object.
If a prf file is specified then it will be used as the new ref file.
If new_aerosols is None then create a file with zero aerosols at all heights
Returns:
None
"""
if new_aerosols is None:
self.aerosols_filepath = None
self.aerosols = parsers.AerosolRef.empty(self.reference.data.height, self.num_aerosol_modes)
elif isinstance(new_aerosols, (parsers.AerosolRef)):
self.aerosols_filepath = None
self.aerosols = new_aerosols
# otherwise assume pathlike is specified
else:
self.aerosols_filepath = Path(new_aerosols).resolve()
self.aerosols = parsers.AerosolRef(self.aerosols_filepath)
[docs]
def get_new_id(self):
"""Refresh the core ID to a new sequential value and update the core directory accordingly.
Args:
None
Returns:
None
"""
global CORE_ID_COUNTER
CORE_ID_COUNTER += 1
self.id_ = CORE_ID_COUNTER
self.directory = self.parent_directory / f"core_{self.id_}"
[docs]
def delete_auxillary_files(self):
"""
Delete most of the input files and some rarely used ouptut files from the core to reduce disk space.
Note, in order to run this core again the missing input files will need to be regenrated with generate_core().
Args:
None
Returns:
None
"""
files = [
"nemesis.abo",
"nemesis.nam",
"nemesis.inp",
"makephase.inp",
"makephase.out",
"normxsc.inp",
"normxsc.out",
"aerosol.ref",
"hgphase*.dat",
"kk.out",
"nemesis.fla",
"cloudf*.dat",
"eleos_generation.py",
"fcloud.ref",
"nemesis.out",
"nemesis.cia",
"nemesis.cor",
"nemesis.pha",
"nemesis.inp",
"nemesis.pat",
"nemesis.raw",
"nemesis.sca",
"nemesis.set",
"nemesis.sol",
"nemesis.xsc",
"PHASE*.DAT",
"refindex*.dat",
"pressure.lay"
]
for file in files:
for fp in self.directory.glob(file):
if fp.is_file():
fp.unlink()
[docs]
def delete_core_directory(self, confirm=True):
"""
Delete the entire core directory and all its contents.
Args:
confirm (bool): Whether to confirm with the user before deleting
Returns:
None
"""
if confirm:
x = input(f"Are you sure you want to delete the entire core directory at {self.directory.resolve()}? Y/N ")
if x.lower() == "n":
return
shutil.rmtree(self.directory)
[docs]
def run(self, verbosity=0, confirm=False):
"""Run NEMESIS on the core. This method should only be used for short forward models as
it does not schedule the jobs on the ALICE compute nodes (see run_alice_job() for that),
instead running it in the current terminal.
Args:
None
Returns:
NemeisResult: The results object for the core"""
self.generate_core(verbosity=verbosity, confirm=confirm)
if verbosity > 0:
print("Running NEMESIS...")
subprocess.run("Nemesis < nemesis.nam | tee nemesis.prc",
shell=True,
cwd=self.directory)
res = results.NemesisResult(self.directory)
# res.make_summary_plot()
if verbosity > 0:
print("Finished!")
return res
[docs]
def generate_prior_distributions(self):
"""Run NEMESIS briefly in a temporary directory to generate the prior gas and aerosol profiles.
Args:
None
Returns:
pd.DataFrame: Data from the .prf files generated. Columns are: 'height', 'pressure', 'temperature', the gas profiles, and the aerosol profiles
"""
# Change core directory to eleos/tmp temporarily
old_dir = self.directory
self.directory = constants.PATH / "tmp"
# Clear tmp directory
for fp in self.directory.glob("*"):
if fp.is_file():
fp.unlink()
# Copy any extra files from the main directory
for filename in old_dir.glob("*"):
if filename.is_file():
shutil.copy(filename, self.directory)
# Generate the core in this directory
self.generate_core(verbosity=0, confirm=False)
# Run NEMESIS
with open(self.directory / "nemesis.nam") as inp:
proc = subprocess.Popen("Nemesis < nemesis.nam > nemesis.prc",
shell=True,
cwd=self.directory,
preexec_fn=os.setsid)
# Wait for the aerosol.prf file to be generated - potential race condition??
while True:
time.sleep(0.05)
if (self.directory / "aerosol.prf").exists() and (self.directory / "nemesis.prf").exists():
break
# Kill NEMESIS
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
# Read in the .prf files
aerosol_prf = parsers.AerosolRef(self.directory / "aerosol.prf")
nemesis_prf = parsers.NemesisRef(self.directory / "nemesis.prf")
# Reset the directory and clear tmp
for fp in self.directory.glob("*"):
if fp.is_file():
fp.unlink()
self.directory = old_dir
nemesis_prf.data.height = np.round(nemesis_prf.data.height * 1000).astype(int)
aerosol_prf.data.height = np.round(aerosol_prf.data.height * 1000).astype(int)
merged = pd.merge(nemesis_prf.data, aerosol_prf.data, on="height")
merged.height /= 1000
return merged
[docs]
def get_aerosol_profile(self, id=None):
"""Given an aerosol ID (as used by NEMESIS) return the corresponding AerosolProfile object.
Args:
id (int): The aerosol ID
label (str): The label associated with the AerosolProfile object
Returns:
AerosolProfile: The corresponding aerosol profile"""
id = abs(id)
if id > self.num_aerosol_modes:
raise IndexError(f"Aerosol index is too large! ({id} vs {self.num_aerosol_modes} max.)")
for profile in self.profiles.values():
if isinstance(profile, profiles_.AerosolProfile):
if profile.aerosol_id == id:
return profile
[docs]
def add_profile(self, profile):
"""Add a profile to retrieve. It's recommended to do this during instantiation using the profiles argument.
Args:
profile: profiles.Profile object to add
Returns:
None
"""
self.profiles[profile.label] = profile
profile.core = self
if isinstance(profile, profiles_.AerosolProfile):
# Increment aerosol mode counter
self.num_aerosol_modes += 1
# Assign aerosol IDs
profile.aerosol_id = self.num_aerosol_modes
[docs]
def remove_profile(self, profile_label):
"""Remove a profile. WIP - DOES NOT WORK FOR AEROSOL PROFILES
Args:
profile_label (str): The label of the profile to remove
Returns:
None
"""
profile = self.profiles[profile_label]
if isinstance(profile, profiles_.AerosolProfile):
# Decrement aerosol mode counter
self.num_aerosol_modes -= 1
del self.profiles[profile_label]
[docs]
def fix_peak(self, central_wavelength, width, error=1e-15):
"""Set a region of the spectra to be fixed (ie. NEMESIS will be forced to match it). This is done internally by setting the
error on that region to be extrmemely small.
Args:
central_wavelength (float): The central wavelength of the peak
width (float): The width of the spectral feature (this is a full width, so the region being fixed is centre-width/2 to centre+width/2)
error (float): Optionally, give the error to be assigned to the region. This should be very small and there is not much need to change this
Returns:
None
"""
self.fixed_peaks.append(FixedPeak(central_wavelength, width, error))
[docs]
def get_ktable_gas_names(self):
"""Return the names of the gases used in the filenames of the ktables. Assumes a format of '[molecule].combi.kta: eg. ch4.combi.kta -> "ch4" """
gas_names = []
for ktable_path in self.ktable_path.glob("*.kta"):
name = re.search(r'([^/]+)(?=\.combi\.kta)', str(ktable_path))
gas_names.append(name.group(1))
return gas_names
[docs]
def set_pressure_limits(self, max_pressure, min_pressure):
"""Change the pressure limits of the core. Also sets the bottom layer height to the closest value in the .ref file
Args:
max_pressure (float): The new maximum pressure in bar. If None then use the maximum pressure in the .ref file
min_pressure (float): The new minimum pressure in bar. If None then use the minimum pressure in the .ref file
Returns:
None"""
if min_pressure is None:
self.min_pressure = self.reference.data.pressure.min()
else:
self.min_pressure = min_pressure
if max_pressure is None:
self.max_pressure = self.reference.data.pressure.max()
else:
self.max_pressure = max_pressure
if min_pressure >= max_pressure:
raise ValueError("min_pressure must be less than max_pressure!")
i, _ = utils.find_nearest(self.reference.data.pressure, self.max_pressure)
self.bottom_layer_height = self.reference.data.height.iloc[i]
[docs]
def get_height_limits(self):
"""Get the heights of the top and bottom layers of the atmosphere in km.
Args:
None
Returns:
top_height: Height of the top of the atmosphere
bottom_height: Height at the bottom of the atmosphere"""
heights = self.reference.data.height
return min(heights), max(heights)
[docs]
def set_random_priors(self, parameters):
"""For each eleos.mcmc.Parameter object given, set the values randomly based on a
normal distribution with mean and standard deviation given by the current param and param_error
Args:
parameters (List[mcmc.Parameters]): The parameters to randomise
Returns:
None
"""
for param in parameters:
profile = self.profiles[param.profile_name]
val = getattr(profile, param.param_name)
err = getattr(profile, param.param_name+"_error")
for i in range(500):
new_val = np.random.normal(val, err)
setattr(profile, param.param_name, new_val)
if profile.check_validity():
break
else:
raise ValueError(f"Could not find valid value for {param}")
[docs]
def get_profile_variable_names(self):
"""Return the names of the variables (ie the parameters that NEMESIS can vary) in each profile.
See also: `get_profile_constant_names` and get_profile_parameter_names`
Args:
None
Returns:
dict: Dictionary of the form {profile_label: [variable names, ...]}"""
out = dict()
for label, profile in self.profiles.items():
out[label] = profile.VARIABLES
return out
[docs]
def get_profile_constant_names(self):
"""Return the names of the constants (ie. the parameters that NEMESIS doesnt fit) in each profile.
See also: `get_profile_constant_names` and get_profile_parameter_names`
Args:
None
Returns:
dict: Dictionary of the form {profile_label: [variable names, ...]}"""
out = dict()
for label, profile in self.profiles.items():
out[label] = profile.CONSTANTS
return out
[docs]
def get_profile_parameter_names(self):
"""Return the names of the parameters (variables and constants) in each profile.
See also: `get_profile_constant_names` and get_profile_parameter_names`
Args:
None
Returns:
dict: Dictionary of the form {profile_label: [variable names, ...]}"""
out = dict()
for label, profile in self.profiles.items():
out[label] = profile.VARIABLES
out[label] += profile.CONSTANTS
return out
[docs]
def get_spx_wavelength_grid(self):
"""Return the wavelength grid of the spx file associated with the core.
Args:
None
Returns:
np.ndarray: The wavelength grid in microns"""
return self.spectrum.wavelengths
[docs]
def copy_core(self, new_parent_directory=None):
"""Create a copy of the current core object with a new ID.
Args:
new_parent_directory (pathlike): Optionally, specify a new parent directory for the copied core.
If None then use the same parent directory as the original core.
Returns:
NemesisCore: The copied core object"""
new_core = copy.deepcopy(self)
if new_parent_directory is not None:
new_core.parent_directory = new_parent_directory
new_core.get_new_id()
return new_core
[docs]
class FixedPeak:
"""Used internally to specify if any spectral regions should be fixed so that NEMESIS always fits it there. Don't instantiate, instead use :meth:`~eleos.cores.NemesisCore.fix_peak`"""
[docs]
def __init__(self, central_wavelength, width, error):
self.central_wavelength = central_wavelength
self.width = width
self.error = error
def __str__(self):
return f"<FixedPeak at {self.central_wavelength}±{self.width/2:.4f}um>"
[docs]
def isin(self, wl):
"""Check whether a specific wavelength is within the region.
Args:
wl (float): The wavelength to check
"""
return (wl > self.central_wavelength - self.width/2) & (wl < self.central_wavelength + self.width/2)
# NEMESIS extension functions
@functools.cache
def _sparsify_single_ktable(wavelengths, ktable_path, output_path):
"""
Reduce the size of the ktables using the Bracketing Neighbors method to select only relevant
wavelengths. TODO: trim the pressure grid to align with that of the model atmosphere? TODO:
reduce the number of g-ordinates?
Args:
wavelengths (np.ndarray): Wavelengths in the .spx file
ktable_path (str or Path): Path to the .kta file
output_path (str or Path): Path of the output .kta file to be wrtten
Returns:
None
Credit: Joe Penn (Oxford University)
"""
record_length = 4
target_wavelengths = wavelengths
with open(ktable_path, 'rb') as infile, open(output_path, 'wb') as outfile:
# --- Read the entire original header and metadata ---
infile.seek(0)
header_data = infile.read(10 * record_length)
(irec0_in, npoint_in, vmin_in, delv_in, fwhm, np_val, nt_val, ng_val, idgas, isogas) = \
struct.unpack('iifffiiiii', header_data)
# Read metadata arrays
current_pos = 10 * record_length
infile.seek(current_pos)
g_ord = np.fromfile(infile, dtype=np.float32, count=ng_val)
current_pos += ng_val * record_length
infile.seek(current_pos)
del_g = np.fromfile(infile, dtype=np.float32, count=ng_val)
current_pos += ng_val * record_length
current_pos += 2 * record_length
infile.seek(current_pos)
press1 = np.fromfile(infile, dtype=np.float32, count=np_val)
current_pos += np_val * record_length
infile.seek(current_pos)
temp1 = np.fromfile(infile, dtype=np.float32, count=nt_val)
current_pos += nt_val * record_length
if delv_in > 0.0:
k_wavelengths = vmin_in + np.arange(npoint_in) * delv_in
else:
infile.seek(current_pos)
k_wavelengths = np.fromfile(infile, dtype=np.float32, count=npoint_in)
min_k, max_k = k_wavelengths[0], k_wavelengths[-1]
if np.any(target_wavelengths < min_k) or np.any(target_wavelengths > max_k):
# Use raise to pass the error up to the main loop
raise ValueError(f"Targets are outside the k-table range ({min_k:.4f} to {max_k:.4f}).")
# --- Bracketing Neighbors logic ---
indices_right = np.searchsorted(k_wavelengths, target_wavelengths, side='right')
indices_right = np.clip(indices_right, 1, len(k_wavelengths) - 1)
indices_left = indices_right - 1
all_indices = np.concatenate((indices_left, indices_right))
unique_indices = np.unique(all_indices) # Finds unique and sorts
npoint_out = len(unique_indices)
vcen_out = k_wavelengths[unique_indices]
#print(f"Found {npoint_out} unique bracketing points to extract.")
# --- Calculate the new IREC0 and write the header ---
header_records_out = 10 + ng_val + ng_val + 2 + np_val + nt_val
if delv_in <= 0.0:
header_records_out += npoint_out
new_irec0 = header_records_out + 1
#print(f"Calculated new starting record for output data (IREC0): {new_irec0}")
vmin_out = vcen_out[0] if npoint_out > 0 else 0.0
delv_out = -1.0
new_header = struct.pack('iifffiiiii', new_irec0, npoint_out, vmin_out, delv_out, fwhm, np_val, nt_val, ng_val, idgas, isogas)
outfile.write(new_header)
outfile.write(g_ord.astype(np.float32).tobytes())
outfile.write(del_g.astype(np.float32).tobytes())
outfile.write(b'\x00\x00\x00\x00' * 2)
outfile.write(press1.astype(np.float32).tobytes())
outfile.write(temp1.astype(np.float32).tobytes())
if delv_in <= 0.0:
outfile.write(vcen_out.astype(np.float32).tobytes())
# --- Extract and write data blocks ---
block_size_count = np_val * nt_val * ng_val
for i, idx in enumerate(unique_indices):
start_byte = (irec0_in - 1 + block_size_count * idx) * record_length
infile.seek(start_byte)
data_block = np.fromfile(infile, dtype=np.float32, count=block_size_count)
data_block[np.isnan(data_block)] = 0.0
outfile.write(data_block.tobytes())
#print(f"Successfully created: {os.path.basename(output_path)}")
[docs]
def add_to_raddata(filepath):
"""Copy a file into the local raddata/ directory. Useful for choosing arbitrary solar spectrum files for example."""
return shutil.copy(filepath, constants.NEMESIS_PATH / "radtrancode/raddata")
[docs]
def get_refractive_indicies(name, start_wl, end_wl, wl_step):
"""Run Makephase to get the refractive indicies of the gases in the lookup tables.
Args:
name (str): The gas name to use (see constants.MAKEPHGASE_GASES for a list)
start_wl: Start wavelength
end_wl: End wavelength
wl_step: Wavelength step size
Returns:
pd.DataFrame: Dataframe containing the wavelength and the real and imaginary refractive indicies. Columns are: 'wavelength', 'real', and 'imag'
"""
# Make Makephase input file
with open(constants.PATH / "tmp/makephase.inp", mode="w+") as file:
file.write("1\n1\n")
utils.write_nums(file, start_wl, end_wl, wl_step)
file.write("tmp.xsc\ny\n1\n1\n1\n2\n")
file.write(str(constants.MAKEPHASE_GASES[name]))
if name == "Tholins":
file.write("\n1")
# Run Makephase
proc = subprocess.run(f"Makephase < makephase.inp",
shell=True,
cwd=constants.PATH.resolve() / "tmp",
capture_output=True)
# Extract refractive indicies from stdout
real, imag = [], []
for line in proc.stdout.decode("ASCII").split("\n"):
if line.startswith(" Refractive index:"):
r, i = utils.get_floats_from_string(line)
real.append(r)
imag.append(i)
# Clean up tmp/
for fp in (constants.PATH / "tmp/").glob("*"):
if fp.is_file():
fp.unlink()
wavelengths = np.arange(start_wl, end_wl+wl_step, wl_step)
data = pd.DataFrame(np.array([wavelengths, real, imag]).T, columns=["wavelength", "real", "imag"])
return data
[docs]
def run_makephase(directory,
start_wl, end_wl, norm_wl, wl_step,
profiles=list(),):
directory = Path(directory)
directory.mkdir(parents=True, exist_ok=True)
if isinstance(profiles, profiles_.AerosolProfile):
profiles = [profiles]
s = min(start_wl, norm_wl)
e = max(end_wl, norm_wl)
ni, n = utils.find_nearest(np.arange(s, e+wl_step, wl_step), norm_wl)
# Generate the makephase.inp file
with open(directory / "makephase.inp", mode="w+") as file:
file.write(str(len(profiles)) + "\n")
file.write("1\n")
utils.write_nums(file, s, e, wl_step)
file.write("nemesis\n")
file.write("y\n")
for profile in profiles:
if isinstance(profile, profiles_.AerosolProfile):
file.write(f"1\n{profile.radius} {profile.variance}\n2\n")
if not profile.lookup:
file.write(f"1\n{profile.real_n} {profile.imag_n}\n")
else:
file.write(f"{constants.MAKEPHASE_GASES[profile.n_lookup]}\n")
else:
raise ValueError("Only AerosolProfiles can be used.")
# Generate the normxsc.inp file
with open(directory / "normxsc.inp", mode="w+") as file:
file.write(f"nemesis.xsc\n{ni + 1} 1")
cwd = os.getcwd()
os.chdir(directory)
os.system("Makephase < makephase.inp > makephase.out")
os.system("Normxsc < normxsc.inp > normxsc.out")
os.chdir(cwd)
# Core creation/loading functions
[docs]
def load_core(core_directory):
"""Load a NemesisCore object saved using NemesisCore._save_core. Do not use this to load a core that has been retrieved;
use `results.NemesisResult` for that purpose
Args:
None
Returns:
NemesisCore: The unpickled core"""
core_directory = Path(core_directory)
with open(core_directory / "core.pkl", 'rb') as f:
core = pickle.load(f)
# Refresh the directory attributes in the NemesisCore object in case the folder has been moved
core.parent_directory = (core_directory / "..").resolve()
core.directory = core_directory.resolve()
return core
[docs]
def load_from_previous(previous_directory, parent_directory, use_retrieval_errors=False):
"""Load a core from a previous retrieval to use as a template for creating a new core. This method loads the core
using the core.pkl file and then calls from_previous_retrieval on all Profile objects stored, as well as resetting core ID,
parent_directory, etc...
Args:
previous_directory (str): The path to the core directory to load
parent_directory: The new parent directory to create the new core under
use_retrieval_errors (bool): Whether to use the retrieval errors from the previous retrieval as the new parameter errors.
If False, the original parameter errors from the previous core are used.
Returns:
NemesisCore: The new core object"""
core = load_core(previous_directory)
res = results.NemesisResult(previous_directory)
global CORE_ID_COUNTER
CORE_ID_COUNTER += 1
core.id_ = CORE_ID_COUNTER
core.parent_directory = Path(parent_directory)
core.directory = core.parent_directory / f"core_{core.id_}"
core.num_aerosol_modes = 0
for label, profile in core.profiles.items():
core.add_profile(profile.from_previous_retrieval(res, label, use_retrieval_errors))
return core
[docs]
def create_sensitivity_analysis(parent_directory,
template_core,
generate_cores=True,
factors=(0.80, 0.90, 0.95, 1.05, 1.10, 1.20),
forward=True,
parameters=None):
"""
Create a sensitivity analysis on the template core. This is done by creating a new core for
each parameter in each profile, and varying that parameter by a small amount.
Args:
parent_directory (str): The parent directory to create the new cores in
template_core (NemesisCore): The core to use as a template
generate_cores (bool): Whether to generate the cores or just return the list of cores to be generated
factors (tuple): The factors to vary the parameters by. Default is (0.8, 0.9, 0.95, 1.05, 1.1, 1.2). Note there is no
need to include 1.00 as a factor as this is equivalent to the baseline which is already calculated
forward (bool): Whether to run forward models or retrievals. Use forward models to see where each parameter affects each
spectrum and retrievals to test if different priors converge on different solutions
parameters (List[mcmc.Parameter]): Optionally, a list of specific parameters to vary.
If None, all variable parameters will be varied.
Returns:
List[NemesisCore]: A list of new cores with the parameters varied
"""
global CORE_ID_COUNTER
reset_core_numbering()
parent_directory = Path(parent_directory)
parent_directory.mkdir(parents=True)
# Create a file that stored which parameters were varied and by how much for each core
file = open(f"{parent_directory}/sensitivity_analysis.txt", "w+")
file.write("Core ID,Profile Label,Parameter,Base Value,Shifted Value,Factor\n")
CORE_ID_COUNTER += 1
template_core.parent_directory = parent_directory
template_core.directory = parent_directory / f"core_{CORE_ID_COUNTER}"
template_core.forward = forward
cores = [template_core]
# Generate the template core if requested
if generate_cores:
template_core.forward = True
template_core.generate_core()
# Iterate through every parameter in every profile in the template core
for param in parameters:
base_value = getattr(template_core.profiles[param.profile_name], param.param_name)
for factor in factors:
core = copy.deepcopy(template_core)
core.get_new_id()
core.forward = forward
setattr(core.profiles[param.profile_name], param.param_name, base_value * factor)
cores.append(core)
file.write(f"{core.id_},{param.profile_name},{param.param_name},{base_value},{base_value * factor},{factor}\n")
file.flush()
if generate_cores:
core.generate_core()
# for label, params in template_core.get_profile_parameter_names().items():
# file.flush()
# for param in params:
# # Vary the parameter value by 80% to 120%
# for factor in factors:
# # Copy the core (deep copy to make sure nothing is passed by reference)
# core = copy.deepcopy(template_core)
# core.parent_directory = parent_directory
# core.get_new_id()
# core.forward = forward
# # Get and modify the parameter value and log it in the parameter file
# base_value = getattr(core.profiles[label], param)
# setattr(core.profiles[label], param, base_value * factor)
# cores.append(core)
# file.write(f"{core.id_},{label},{param},{base_value},{base_value * factor},{factor}\n")
# # Generate the cores if requested
# if generate_cores:
# core.generate_core()
# Always close your files, kids!
file.close()
return cores
[docs]
def create_gas_analysis_cores(parent_directory, template_core, generate_cores=True, new_spx=None):
"""Create a set of cores where each core has a single gas excluded. Useful for determining where in
the spectrum each gas contributes.
Args:
parent_directory (str): The parent directory to create the new cores in
template_core (NemesisCore): The core to use as a template
generate_cores (bool): Whether to generate the cores or just return the list of cores to be generated
new_spx (str): The path to a new spx file to use instead of the one in the template core. This is useful if you want to use a different resolution or wavelength range
Returns:
List[NemesisCore]: A list of new cores with the gases excluded"""
clear_parent_directory(parent_directory)
reset_core_numbering()
for gas in template_core.use_gases:
print(gas)
return None
[docs]
def reset_core(core_directory):
"""Delete all the files that NEMESIS generates and leave only the input files.
Useful for re-running a retrieval from scratch."""
core_directory = Path(core_directory)
to_delete = [
"kk.dat",
"kk.out",
"nemesis.chi",
"nemesis.cov",
"nemesis.drv",
"nemesis.itr",
"nemesis.log",
"nemesis.mre",
"nemesis.pat",
"nemesis.prc",
"nemesis.prf",
"nemesis.raw",
"nemesis.sca",
"aerosol.prf",
"fcloud.prf",
"*slurm*",
"parah2.prf",
"refindex4.dat",
"refindex6.dat"
]
for pattern in to_delete:
for file_path in core_directory.glob(pattern):
if file_path.is_file():
file_path.unlink()
print(f"Deleted: {file_path.name}")
[docs]
def parallelise_forward(core, n_chunks=None, chunk_size=None, spx_save_dir=None):
"""Split a single forward model into multiple by distrubuting chunks of the spectrum between each core.
Useful for running high spectral resolution cores. You must specify ONE of n_chunks or chunk_size.
Args:
core (NemesisCore): The core to run
n_chunks (int): Number of chunks to split into
chunk_size (int): Alternatively, specify the size of each chunk
Returns:
list[NemesisCore]: The cores to run in parallel
"""
raise NotImplementedError("This is broken for some reason")
# check the core is running in forward mode
assert core.forward
# get the NemesisSpx object
spx = core.spectrum
# calculate indicies of each chunk
if n_chunks is None:
n_chunks = (len(spx.wavelengths) + chunk_size - 1) // chunk_size
idxs_arr = np.array_split(np.arange(len(spx.wavelengths)), n_chunks)
print(f"Creating {n_chunks} cores, each with ~{len(idxs_arr[0])} wavelengths")
# create new core for each chunk
reset_core_numbering()
allcores = []
for i, idxs in enumerate(idxs_arr):
wls = spx.wavelength[idxs]
spc = spx.spectrum[idxs]
err = spx.error[idxs]
new_spx = parsers.NemesisSpx.from_lists(wls, spc, err, **spx.get_angles(), convert=False)
new_spx.write(Path(spx_save_dir) / f"chunk_{i}.spx")
new_core = copy.deepcopy(core)
new_core.get_new_id()
new_core.change_spectrum(new_spx)
allcores.append(new_core)
return allcores
# Utility functions
[docs]
def reset_core_numbering():
"""Reset the automatic core numbering. Useful for creating multiple sets of cores in one program.
Args:
None
Returns:
None"""
global CORE_ID_COUNTER
CORE_ID_COUNTER = 0
[docs]
def renumber_cores(cores):
reset_core_numbering()
for core in cores:
core.get_new_id()
return cores
[docs]
def clear_parent_directory(parent_directory, confirm=True):
"""Attempt to delete all the child files/directories from the specified folder. It is recommended to call this function at the start
of every core generation script as it ensures that the script is fully stateless. If a script generates N cores when first run
and is subsequently modified to produce N/2 cores, then the remaining N/2 core directories will remain in the parent directory.
This should not matter as the SLURM script will only run the correct subset of cores, but it can get confusing. If the parent directory
specified does not exist then it will print a warning and return.
Args:
parent_directory (str): The directory to clear
confirm (bool): Whether to confirm with the user before deleting the directory
Returns:
None"""
parent_directory = Path(parent_directory)
if os.path.exists(parent_directory):
if confirm:
x = input(f"{parent_directory.resolve()} already exists - are you sure you want to erase and continue? Y/N ")
if x.lower() == "n":
print("Quitting")
exit()
else:
shutil.rmtree(parent_directory)
else:
shutil.rmtree(parent_directory)
[docs]
def generate_alice_job(parent_directory,
python_env_name,
memory=16,
hours=0,
minutes=0,
username=None,
notify=("end", "fail"),
type_="normal"):
"""Generate an sbatch submission script for use on ALICE. The job is an array job over all the specified cores.
After running NEMESIS it will run Eleos to create some summary plots in the parent/core_N/plots directory
Args:
parent_directory (str): The directory containing the NEMESIS core directories to be run
python_env_name (str): The name of a conda environment which has Eleos installed
memory (int): The amount of memory to use as a string, formatted like '100mb' or '16gb'
hours (int): The number of hours to schedule the job for
minutes (int): The number of minutes to schedule the job for
username (str): The username of the user running the job (eg, scat2, lnf2)
notify (List[str] or str): What email notifications to send. Can be any combination of 'begin', 'end', 'fail'
type_ (str): The type of job to run. Can be 'normal' or 'sensitivity'. This will determine how Eleos is run at the end of the job.
Returns:
None
Creates:
submitjob.sh in the parent directory of the cores"""
parent_directory = Path(parent_directory)
script_path = parent_directory / "submitjob.sh"
ncores = len(list(parent_directory.glob("core_*")))
# Coerce notify to an iterable and create the string
try:
len(notify)
except:
notify = [notify]
notify_str = ",".join(notify).upper()
# Assert that hours,minutes and memory are correct
assert hours >= 0
assert memory.endswith("mb") or memory.endswith("gb")
assert 0 <= minutes < 60
# Read the submission script and replace template fields
with open(constants.PATH / "data/statics/template.job", mode="r") as file:
out = file.read()
out = out.replace("<MEMORY>", memory)
out = out.replace("<HOURS>", f"{hours:02}")
out = out.replace("<MINUTES>", f"{minutes:02}")
out = out.replace("<N_CORES>", str(ncores))
out = out.replace("<CORE_DIR>", str(parent_directory.resolve()))
out = out.replace("<USERNAME>", str(username))
out = out.replace("<PYTHON_ENV>", python_env_name)
out = out.replace("<NOTIFY>", notify_str)
if type_ == "sensitivity":
out += f"\npython -m eleos {parent_directory.resolve()} --make-sensitivity-summary --run-if-finished"
# Write the filled template
with open(script_path, mode="w+") as file:
file.write(out)
[docs]
def run_alice_job(parent_directory, print_queue_delay=2):
"""Run the script created by generate_alice_job
Args:
parent_directory (str): Path to the directory containing all the cores
print_queue_delay (float): Duration to wait (in seconds) before printing the current job queue
Returns:
None
"""
parent_directory = Path(parent_directory)
os.system(f"sbatch {parent_directory / 'submitjob.sh'}")
time.sleep(print_queue_delay)
os.system("squeue --me")