Source code for CIME.XML.archive_base

"""
Base class for archive files.  This class inherits from generic_xml.py
"""
from CIME.XML.standard_module_setup import *
from CIME.XML.generic_xml import GenericXML
from CIME.utils import convert_to_type

logger = logging.getLogger(__name__)


[docs] class ArchiveBase(GenericXML):
[docs] def exclude_testing(self, compname): """ Checks if component should be excluded from testing. """ value = self._get_attribute(compname, "exclude_testing") if value is None: return False return convert_to_type(value, "logical")
def _get_attribute(self, compname, attr_name): attrib = self.get_entry_attributes(compname) if attrib is None: return None return attrib.get(attr_name, None)
[docs] def get_entry_attributes(self, compname): entry = self.get_entry(compname) if entry is None: return None return self.attrib(entry)
[docs] def get_entry(self, compname): """ Returns an xml node corresponding to compname in comp_archive_spec """ return self.scan_optional_child( "comp_archive_spec", attributes={"compname": compname} )
def _get_file_node_text(self, attnames, archive_entry): """ get the xml text associated with each of the attnames based at root archive_entry returns a list of text entries or an empty list if no entries are found """ nodes = [] textvals = [] for attname in attnames: nodes.extend(self.get_children(attname, root=archive_entry)) for node in nodes: textvals.append(self.text(node)) return textvals
[docs] def get_rest_file_extensions(self, archive_entry): """ get the xml text associated with each of the rest_file_extensions based at root archive_entry (root is based on component name) returns a list of text entries or an empty list if no entries are found """ return self._get_file_node_text(["rest_file_extension"], archive_entry)
[docs] def get_hist_file_extensions(self, archive_entry): """ get the xml text associated with each of the hist_file_extensions based at root archive_entry (root is based on component name) returns a list of text entries or an empty list if no entries are found """ return self._get_file_node_text(["hist_file_extension"], archive_entry)
[docs] def get_hist_file_ext_regexes(self, archive_entry): """ get the xml text associated with each of the hist_file_ext_regex entries based at root archive_entry (root is based on component name) returns a list of text entries or an empty list if no entries are found """ return self._get_file_node_text(["hist_file_ext_regex"], archive_entry)
[docs] def get_entry_value(self, name, archive_entry): """ get the xml text associated with name under root archive_entry returns None if no entry is found, expects only one entry """ node = self.get_optional_child(name, root=archive_entry) if node is not None: return self.text(node) return None
[docs] def get_latest_hist_files( self, casename, model, from_dir, suffix="", ref_case=None ): """ get the most recent history files in directory from_dir with suffix if provided """ test_hists = self.get_all_hist_files( casename, model, from_dir, suffix=suffix, ref_case=ref_case ) ext_regexes = self.get_hist_file_ext_regexes( self.get_entry(self._get_compname(model)) ) latest_files = {} histlist = [] for hist in test_hists: ext = _get_extension(model, hist, ext_regexes) latest_files[ext] = hist for key in latest_files.keys(): histlist.append(latest_files[key]) return histlist
[docs] def get_all_hist_files(self, casename, model, from_dir, suffix="", ref_case=None): """ gets all history files in directory from_dir with suffix (if provided) ignores files with ref_case in the name if ref_case is provided """ dmodel = self._get_compname(model) # remove when component name is changed if model == "fv3gfs": model = "fv3" if model == "cice5": model = "cice" if model == "ww3dev": model = "ww3" hist_files = [] extensions = self.get_hist_file_extensions(self.get_entry(dmodel)) if suffix and len(suffix) > 0: has_suffix = True else: has_suffix = False # Strip any trailing $ if suffix is present and add it back after the suffix for ext in extensions: if ext.endswith("$") and has_suffix: ext = ext[:-1] string = model + r"\d?_?(\d{4})?\." + ext if has_suffix: if not suffix in string: string += r"\." + suffix + "$" if not string.endswith("$"): string += "$" logger.debug("Regex is {}".format(string)) pfile = re.compile(string) hist_files.extend( [ f for f in os.listdir(from_dir) if pfile.search(f) and ( (f.startswith(casename) or f.startswith(model)) and not f.endswith("cprnc.out") ) ] ) if ref_case: expect( ref_case not in casename, "ERROR: ref_case name {} conflicts with casename {}".format( ref_case, casename ), ) hist_files = [ h for h in hist_files if not (ref_case in os.path.basename(h)) ] hist_files = list(set(hist_files)) hist_files.sort() logger.debug( "get_all_hist_files returns {} for model {}".format(hist_files, model) ) return hist_files
@staticmethod def _get_compname(model): """ Given a model name, return a possibly-modified name for use as the compname argument to get_entry """ if model == "cpl": return "drv" return model
def _get_extension(model, filepath, ext_regexes): r""" For a hist file for the given model, return what we call the "extension" model - The component model filepath - The path of the hist file ext_regexes - A list of model-specific regexes that are matched before falling back on the general-purpose regex, r'\w+'. In many cases this will be an empty list, signifying that we should just use the general-purpose regex. >>> _get_extension("cpl", "cpl.hi.nc", []) 'hi' >>> _get_extension("cpl", "cpl.h.nc", []) 'h' >>> _get_extension("cpl", "cpl.h1.nc.base", []) 'h1' >>> _get_extension("cpl", "TESTRUNDIFF.cpl.hi.0.nc.base", []) 'hi' >>> _get_extension("cpl", "TESTRUNDIFF_Mmpi-serial.f19_g16_rx1.A.melvin_gnu.C.fake_testing_only_20160816_164150-20160816_164240.cpl.h.nc", []) 'h' >>> _get_extension("clm","clm2_0002.h0.1850-01-06-00000.nc", []) '0002.h0' >>> _get_extension("pop","PFS.f09_g16.B1850.cheyenne_intel.allactive-default.GC.c2_0_b1f2_int.pop.h.ecosys.nday1.0001-01-02.nc", []) 'h' >>> _get_extension("mom", "ga0xnw.mom6.frc._0001_001.nc", []) 'frc' >>> _get_extension("mom", "ga0xnw.mom6.sfc.day._0001_001.nc", []) 'sfc.day' >>> _get_extension("mom", "bixmc5.mom6.prog._0001_01_05_84600.nc", []) 'prog' >>> _get_extension("mom", "bixmc5.mom6.hm._0001_01_03_42300.nc", []) 'hm' >>> _get_extension("mom", "bixmc5.mom6.hmz._0001_01_03_42300.nc", []) 'hmz' >>> _get_extension("pop", "casename.pop.dd.0001-01-02-00000", []) 'dd' >>> _get_extension("cism", "casename.cism.gris.h.0002-01-01-0000.nc", [r"\w+\.\w+"]) 'gris.h' """ # Remove with component namechange if model == "fv3gfs": model = "fv3" if model == "cice5": model = "cice" if model == "ww3dev": model = "ww3" basename = os.path.basename(filepath) m = None if ext_regexes is None: ext_regexes = [] # First add any model-specific extension regexes; these will be checked before the # general regex if model == "mom": # Need to check 'sfc.day' specially: the embedded '.' messes up the # general-purpose regex ext_regexes.append(r"sfc\.day") # Now add the general-purpose extension regex ext_regexes.append(r"\w+") for ext_regex in ext_regexes: full_regex_str = model + r"\d?_?(\d{4})?\.(" + ext_regex + r")[-\w\.]*" full_regex = re.compile(full_regex_str) m = full_regex.search(basename) if m is not None: if m.group(1) is not None: result = m.group(1) + "." + m.group(2) else: result = m.group(2) return result expect(m, "Failed to get extension for file '{}'".format(filepath)) return result