Source code for CIME.nmlgen

"""Class for generating component namelists."""

# Typically ignore this.
# pylint: disable=invalid-name

# Disable these because this is our standard setup
# pylint: disable=wildcard-import,unused-wildcard-import

import datetime
import re
import hashlib

from CIME.XML.standard_module_setup import *
from CIME.namelist import Namelist, parse, \
    character_literal_to_string, string_to_character_literal, \
    expand_literal_list, compress_literal_list, merge_literal_lists
from CIME.XML.namelist_definition import NamelistDefinition
from CIME.utils import expect, safe_copy
from CIME.XML.stream import Stream

logger = logging.getLogger(__name__)

_var_ref_re = re.compile(r"\$(\{)?(?P<name>\w+)(?(1)\})")

_ymd_re = re.compile(r"%(?P<digits>[1-9][0-9]*)?y(?P<month>m(?P<day>d)?)?")

_stream_file_template = """<?xml version="1.0"?>
<file id="stream" version="1.0">
<dataSource>
   GENERIC
</dataSource>
<domainInfo>
  <variableNames>
     {domain_varnames}
  </variableNames>
  <filePath>
     {domain_filepath}
  </filePath>
  <fileNames>
     {domain_filenames}
  </fileNames>
</domainInfo>
<fieldInfo>
   <variableNames>
     {data_varnames}
   </variableNames>
   <filePath>
     {data_filepath}
   </filePath>
   <fileNames>
    {data_filenames}
   </fileNames>
   <offset>
      {offset}
   </offset>
</fieldInfo>
</file>
"""

[docs]class NamelistGenerator(object): """Utility class for generating namelists for a given component.""" _streams_variables = [] #pylint:disable=too-many-arguments def __init__(self, case, definition_files, files=None): """Construct a namelist generator. Arguments: `case` - `Case` object corresponding to the current case. `infiles` - List of files with user namelist options. `definition_files` - List of XML files containing namelist definitions. `config` - A dictionary of attributes for matching defaults. """ # Save off important information from inputs. self._case = case self._din_loc_root = case.get_value('DIN_LOC_ROOT') # Create definition object - this will validate the xml schema in the definition file self._definition = NamelistDefinition(definition_files[0], files=files) # Determine array of _stream_variables from definition object # This is only applicable to data models self._streams_namelists = {"streams": []} self._streams_variables = self._definition.get_per_stream_entries() for variable in self._streams_variables: self._streams_namelists[variable] = [] # Create namelist object. self._namelist = Namelist() # Define __enter__ and __exit__ so that we can use this as a context manager def __enter__(self): return self def __exit__(self, *_): return False
[docs] def init_defaults(self, infiles, config, skip_groups=None, skip_entry_loop=False): """Return array of names of all definition nodes """ # first clean out any settings left over from previous calls self.new_instance() self._definition.set_nodes(skip_groups=skip_groups) # Determine the array of entry nodes that will be acted upon entry_nodes = self._definition.set_nodes(skip_groups=skip_groups) # Add attributes to definition object self._definition.add_attributes(config) # Parse the infile and create namelist settings for the contents of infile # this will override all other settings in add_defaults for file_ in infiles: # Parse settings in "groupless" mode. nml_dict = parse(in_file=file_, groupless=True) # Add groups using the namelist definition. new_namelist = self._definition.dict_to_namelist(nml_dict, filename=file_) # Make sure that the input is actually valid. self._definition.validate(new_namelist, filename=file_) # Merge into existing settings (earlier settings have precedence # over later settings). self._namelist.merge_nl(new_namelist) if not skip_entry_loop: for entry in entry_nodes: self.add_default(self._definition.get(entry, "id")) return [self._definition.get(entry, "id") for entry in entry_nodes]
[docs] @staticmethod def quote_string(string): """Convert a string to a quoted Fortran literal. Does nothing if the string appears to be quoted already. """ if string == "" or \ (string[0] not in ('"', "'") or string[0] != string[-1]): string = string_to_character_literal(string) return string
def _to_python_value(self, name, literals): """Transform a literal list as needed for `get_value`.""" var_type, _, var_size, = self._definition.split_type_string(name) if len(literals) > 0: values = expand_literal_list(literals) else: return "" for i, scalar in enumerate(values): if scalar == '': values[i] = None elif var_type == 'character': values[i] = character_literal_to_string(scalar) if var_size == 1: return values[0] else: return values def _to_namelist_literals(self, name, values): """Transform a literal list as needed for `set_value`. This is the inverse of `_to_python_value`, except that many of the changes have potentially already been performed. """ var_type, _, var_size, = self._definition.split_type_string(name) if var_size == 1 and not isinstance(values, list): values = [values] for i, scalar in enumerate(values): if scalar is None: values[i] = "" elif var_type == 'character': expect(not isinstance(scalar, list), name) values[i] = self.quote_string(scalar) return compress_literal_list(values)
[docs] def get_value(self, name): """Get the current value of a given namelist variable. Note that the return value of this function is always a string or a list of strings. E.g. the scalar logical value .false. will be returned as `".false."`, while an array of two .false. values will be returned as `[".false.", ".false."]`. Whether or not a value is scalar is determined by checking the array size in the namelist definition file. Null values are converted to `None`, and repeated values are expanded, e.g. `['2*3']` is converted to `['3', '3', '3']`. For character variables, the value is converted to a Python string (e.g. quotation marks are removed). All other literals are returned as the raw string values that will be written to the namelist. """ return self._to_python_value(name, self._namelist.get_value(name))
[docs] def set_value(self, name, value): """Set the current value of a given namelist variable. Usually, you should use `add_default` instead of this function. The `name` argument is the name of the variable to set, and the `value` is a list of strings to use as settings. If the variable is scalar, the list is optional; i.e. a scalar logical can be set using either `value='.false.'` or `value=['.false.']`. If the variable is of type character, and the input is missing quotes, quotes will be added automatically. If `None` is provided in place of a string, this will be translated to a null value. Note that this function will overwrite the current value, which may hold a user-specified setting. Even if `value` is (or contains) a null value, the old setting for the variable will be thrown out completely. """ var_group = self._definition.get_group(name) literals = self._to_namelist_literals(name, value) _, _, var_size, = self._definition.split_type_string(name) self._namelist.set_variable_value(var_group, name, literals, var_size)
[docs] def get_default(self, name, config=None, allow_none=False): """Get the value of a variable from the namelist definition file. The `config` argument is passed through to the underlying `NamelistDefaults.get_value` call as the `attribute` argument. The return value of this function is a list of values that were found in the defaults file. If there is no matching default, this function returns `None` if `allow_none=True` is passed, otherwise an error is raised. Note that we perform some translation of the values, since there are a few differences between Fortran namelist literals and values in the defaults file: 1) In the defaults file, whitespace is ignored except within strings, so the output of this function strips out most whitespace. (This implies that commas are the only way to separate array elements in the defaults file.) 2) In the defaults file, quotes around character literals (strings) are optional, as long as the literal does not contain whitespace, commas, or (single or double) quotes. If a setting for a character variable does not seem to have quotes (and is not a null value), this function will add them. 3) Default values may refer to variables in a case's `env_*.xml` files. This function replaces references of the form `$VAR` or `${VAR}` with the value of the variable `VAR` in an env file, if that variable exists. This behavior is suppressed within single-quoted strings (similar to parameter expansion in shell scripts). """ default = self._definition.get_value_match(name, attributes=config, exact_match=False) if default is None: expect(allow_none, "No default value found for {}.".format(name)) return None default = expand_literal_list(default) var_type,_,_ = self._definition.split_type_string(name) for i, scalar in enumerate(default): # Skip single-quoted strings. if var_type == 'character' and scalar != '' and \ scalar[0] == scalar[-1] == "'": continue match = _var_ref_re.search(scalar) while match: env_val = self._case.get_value(match.group('name')) expect(env_val is not None, "Namelist default for variable {} refers to unknown XML variable {}.".format(name, match.group('name'))) scalar = scalar.replace(match.group(0), str(env_val), 1) match = _var_ref_re.search(scalar) default[i] = scalar # Deal with missing quotes. if var_type == 'character': for i, scalar in enumerate(default): # Preserve null values. if scalar != '': default[i] = self.quote_string(scalar) default = self._to_python_value(name, default) return default
[docs] def get_streams(self): """Get a list of all streams used for the current data model mode.""" return self.get_default("streamslist")
[docs] def clean_streams(self): for variable in self._streams_variables: self._streams_namelists[variable] = [] self._streams_namelists["streams"] = []
[docs] def new_instance(self): """ Clean the object just enough to introduce a new instance """ self.clean_streams() self._namelist.clean_groups()
def _sub_fields(self, varnames): """Substitute indicators with given values in a list of fields. Replace any instance of the following substring indicators with the appropriate values: %glc = two-digit GLC elevation class from 00 through glc_nec The difference between this function and `_sub_paths` is that this function is intended to be used for variable names (especially from the `strm_datvar` defaults), whereas `_sub_paths` is intended for use on input data file paths. Returns a string. Example: If `_sub_fields` is called with an array containing two elements, each of which contains two strings, and glc_nec=3: foo bar s2x_Ss_tsrf%glc tsrf%glc then the returned array will be: foo bar s2x_Ss_tsrf00 tsrf00 s2x_Ss_tsrf01 tsrf01 s2x_Ss_tsrf02 tsrf02 s2x_Ss_tsrf03 tsrf03 """ lines = varnames.split("\n") new_lines = [] for line in lines: if not line: continue if "%glc" in line: if self._case.get_value('GLC_NEC') == 0: glc_nec_indices = [] else: glc_nec_indices = range(self._case.get_value('GLC_NEC')+1) for i in glc_nec_indices: new_lines.append(line.replace("%glc", "{:02d}".format(i))) else: new_lines.append(line) return "\n".join(new_lines) @staticmethod def _days_in_month(month, year=1): """Number of days in the given month (specified as an int, 1-12). The `year` argument gives the year for which to request the number of days, in a Gregorian calendar. Defaults to `1` (not a leap year). """ month_start = datetime.date(year, month, 1) if month == 12: next_year = year+1 next_month = 1 else: next_year = year next_month = month + 1 next_month_start = datetime.date(next_year, next_month, 1) return (next_month_start - month_start).days def _sub_paths(self, filenames, year_start, year_end): """Substitute indicators with given values in a list of filenames. Replace any instance of the following substring indicators with the appropriate values: %y = year from the range year_start to year_end %ym = year-month from the range year_start to year_end with all 12 months %ymd = year-month-day from the range year_start to year_end with all 12 months For the date indicators, the year may be prefixed with a number of digits to use (the default is 4). E.g. `%2ymd` can be used to change the number of year digits from 4 to 2. Note that we assume that there is no mixing and matching of date indicators, i.e. you cannot use `%4ymd` and `%2y` in the same line. Note also that we use a no-leap calendar, i.e. every month has the same number of days every year. The difference between this function and `_sub_fields` is that this function is intended to be used for file names (especially from the `strm_datfil` defaults), whereas `_sub_fields` is intended for use on variable names. Returns a string (filenames separated by newlines). """ lines = [line for line in filenames.split("\n") if line] new_lines = [] for line in lines: match = _ymd_re.search(filenames) if match is None: new_lines.append(line) continue if match.group('digits'): year_format = "{:0"+match.group('digits')+"d}" else: year_format = "{:04d}" for year in range(year_start, year_end+1): if match.group('day'): for month in range(1, 13): days = self._days_in_month(month) for day in range(1, days+1): date_string = (year_format + "-{:02d}-{:02d}").format(year, month, day) new_line = line.replace(match.group(0), date_string) new_lines.append(new_line) elif match.group('month'): for month in range(1, 13): date_string = (year_format + "-{:02d}").format(year, month) new_line = line.replace(match.group(0), date_string) new_lines.append(new_line) else: date_string = year_format.format(year) new_line = line.replace(match.group(0), date_string) new_lines.append(new_line) return "\n".join(new_lines)
[docs] def create_stream_file_and_update_shr_strdata_nml(self, config, caseroot, #pylint:disable=too-many-locals stream, stream_path, data_list_path): """Write the pseudo-XML file corresponding to a given stream. Arguments: `config` - Used to look up namelist defaults. This is used *in addition* to the `config` used to construct the namelist generator. The main reason to supply additional configuration options here is to specify stream-specific settings. `stream` - Name of the stream. `stream_path` - Path to write the stream file to. `data_list_path` - Path of file to append input data information to. """ if os.path.exists(stream_path): os.unlink(stream_path) user_stream_path = os.path.join(caseroot, "user_"+os.path.basename(stream_path)) # Use the user's stream file, or create one if necessary. config = config.copy() config["stream"] = stream # Stream-specific configuration. if os.path.exists(user_stream_path): safe_copy(user_stream_path, stream_path) strmobj = Stream(infile=stream_path) domain_filepath = strmobj.get_value("domainInfo/filePath") data_filepath = strmobj.get_value("fieldInfo/filePath") domain_filenames = strmobj.get_value("domainInfo/fileNames") data_filenames = strmobj.get_value("fieldInfo/fileNames") else: # Figure out the details of this stream. if stream in ("prescribed", "copyall"): # Assume only one file for prescribed mode! grid_file = self.get_default("strm_grid_file", config) domain_filepath, domain_filenames = os.path.split(grid_file) data_file = self.get_default("strm_data_file", config) data_filepath, data_filenames = os.path.split(data_file) else: domain_filepath = self.get_default("strm_domdir", config) domain_filenames = self.get_default("strm_domfil", config) data_filepath = self.get_default("strm_datdir", config) data_filenames = self.get_default("strm_datfil", config) domain_varnames = self._sub_fields(self.get_default("strm_domvar", config)) data_varnames = self._sub_fields(self.get_default("strm_datvar", config)) offset = self.get_default("strm_offset", config) year_start = int(self.get_default("strm_year_start", config)) year_end = int(self.get_default("strm_year_end", config)) data_filenames = self._sub_paths(data_filenames, year_start, year_end) domain_filenames = self._sub_paths(domain_filenames, year_start, year_end) # Overwrite domain_file if should be set from stream data if domain_filenames == 'null': domain_filepath = data_filepath domain_filenames = data_filenames.splitlines()[0] stream_file_text = _stream_file_template.format( domain_varnames=domain_varnames, domain_filepath=domain_filepath, domain_filenames=domain_filenames, data_varnames=data_varnames, data_filepath=data_filepath, data_filenames=data_filenames, offset=offset, ) with open(stream_path, 'w') as stream_file: stream_file.write(stream_file_text) lines_hash = self._get_input_file_hash(data_list_path) with open(data_list_path, 'a') as input_data_list: for i, filename in enumerate(domain_filenames.split("\n")): if filename.strip() == '': continue filepath, filename = os.path.split(filename) if not filepath: filepath = os.path.join(domain_filepath, filename.strip()) string = "domain{:d} = {}\n".format(i+1, filepath) hashValue = hashlib.md5(string.rstrip().encode('utf-8')).hexdigest() if hashValue not in lines_hash: input_data_list.write(string) for i, filename in enumerate(data_filenames.split("\n")): if filename.strip() == '': continue filepath = os.path.join(data_filepath, filename.strip()) string = "file{:d} = {}\n".format(i+1, filepath) hashValue = hashlib.md5(string.rstrip().encode('utf-8')).hexdigest() if hashValue not in lines_hash: input_data_list.write(string) self.update_shr_strdata_nml(config, stream, stream_path)
[docs] def update_shr_strdata_nml(self, config, stream, stream_path): """Updates values for the `shr_strdata_nml` namelist group. This should be done once per stream, and it shouldn't usually be called directly, since `create_stream_file` calls this method itself. """ assert config['stream'] == stream, \ "config stream is {}, but input stream is {}".format(config['stream'], stream) # Double-check the years for sanity. year_start = int(self.get_default("strm_year_start", config)) year_end = int(self.get_default("strm_year_end", config)) year_align = int(self.get_default("strm_year_align", config)) expect(year_end >= year_start, "Stream {} starts at year {:d}, but ends at earlier year {:d}.".format(stream, year_start, year_end)) # Add to streams file. stream_string = "{} {:d} {:d} {:d}".format(os.path.basename(stream_path), year_align, year_start, year_end) self._streams_namelists["streams"].append(stream_string) for variable in self._streams_variables: default = self.get_default(variable, config) expect(len(default) == 1, "Stream {} had multiple settings for variable {}.".format(stream, variable)) self._streams_namelists[variable].append(default[0])
[docs] def set_abs_file_path(self, file_path): """If `file_path` is relative, make it absolute using `DIN_LOC_ROOT`. If an absolute path is input, it is returned unchanged. """ if os.path.isabs(file_path): return file_path else: fullpath = os.path.join(self._din_loc_root, file_path) return fullpath
[docs] def add_default(self, name, value=None, ignore_abs_path=None): """Add a value for the specified variable to the namelist. If the specified variable is already defined in the object, the existing value is preserved. Otherwise, the `value` argument, if provided, will be used to set the value. If no such value is found, the defaults file will be consulted. If null values are present in any of the above, the result will be a merged array of values. If no value for the variable is found via any of the above, this method will raise an exception. """ # pylint: disable=protected-access group = self._definition.get_group(name) # Use this to see if we need to raise an error when nothing is found. have_value = False # Check for existing value. current_literals = self._namelist.get_variable_value(group, name) # Check for input argument. if value is not None: have_value = True # if compression were to occur, this is where it does literals = self._to_namelist_literals(name, value) current_literals = merge_literal_lists(literals, current_literals) # Check for default value. default = self.get_default(name, allow_none=True) if default is not None: have_value = True default_literals = self._to_namelist_literals(name, default) current_literals = merge_literal_lists(default_literals, current_literals) expect(have_value, "No default value found for {}.".format(name)) # Go through file names and prepend input data root directory for # absolute pathnames. var_type, _, var_size = self._definition.split_type_string(name) if var_type == "character" and ignore_abs_path is None: var_input_pathname = self._definition.get_input_pathname(name) if var_input_pathname == 'abs': current_literals = expand_literal_list(current_literals) for i, literal in enumerate(current_literals): if literal == '': continue file_path = character_literal_to_string(literal) # NOTE - these are hard-coded here and a better way is to make these extensible if file_path == 'UNSET' or file_path == 'idmap' or file_path == 'idmap_ignore' or file_path == 'unset': continue if file_path == 'null': continue file_path = self.set_abs_file_path(file_path) if not os.path.exists(file_path): logger.warning("File not found: {} = {}, will attempt to download in check_input_data phase".format(name, literal)) current_literals[i] = string_to_character_literal(file_path) current_literals = compress_literal_list(current_literals) # Set the new value. self._namelist.set_variable_value(group, name, current_literals, var_size)
[docs] def create_shr_strdata_nml(self): """Set defaults for `shr_strdata_nml` variables other than the variable domainfile """ self.add_default("datamode") if self.get_value("datamode") != 'NULL': self.add_default("streams", value=self._streams_namelists["streams"]) for variable in self._streams_variables: self.add_default(variable, value=self._streams_namelists[variable])
[docs] def get_group_variables(self, group_name): return self._namelist.get_group_variables(group_name)
def _get_input_file_hash(self, data_list_path): lines_hash = set() if os.path.isfile(data_list_path): with open(data_list_path, "r") as input_data_list: for line in input_data_list: hashValue = hashlib.md5(line.rstrip().encode('utf-8')).hexdigest() logger.debug( "Found line {} with hash {}".format(line,hashValue)) lines_hash.add(hashValue) return lines_hash def _write_input_files(self, data_list_path): """Write input data files to list.""" # append to input_data_list file lines_hash = self._get_input_file_hash(data_list_path) with open(data_list_path, "a") as input_data_list: for group_name in self._namelist.get_group_names(): for variable_name in self._namelist.get_variable_names(group_name): input_pathname = self._definition.get_node_element_info(variable_name, "input_pathname") if input_pathname is not None: # This is where we end up for all variables that are paths # to input data files. literals = self._namelist.get_variable_value(group_name, variable_name) for literal in literals: file_path = character_literal_to_string(literal) # NOTE - these are hard-coded here and a better way is to make these extensible if file_path == 'UNSET' or file_path == 'idmap' or file_path == 'idmap_ignore': continue if input_pathname == 'abs': # No further mangling needed for absolute paths. # At this point, there are overwrites that should be ignored if not os.path.isabs(file_path): continue else: pass elif input_pathname.startswith('rel:'): # The part past "rel" is the name of a variable that # this variable specifies its path relative to. root_var = input_pathname[4:] root_dir = self.get_value(root_var) file_path = os.path.join(root_dir, file_path) else: expect(False, "Bad input_pathname value: {}.".format(input_pathname)) # Write to the input data list. string = "{} = {}".format(variable_name, file_path) hashValue = hashlib.md5(string.rstrip().encode('utf-8')).hexdigest() if hashValue not in lines_hash: logger.debug("Adding line {} with hash {}".format(string,hashValue)) input_data_list.write(string+"\n") else: logger.debug("Line already in file {}".format(string))
[docs] def write_output_file(self, namelist_file, data_list_path=None, groups=None, sorted_groups=True): """Write out the namelists and input data files. The `namelist_file` and `modelio_file` are the locations to which the component and modelio namelists will be written, respectively. The `data_list_path` argument is the location of the `*.input_data_list` file, which will have the input data files added to it. """ self._definition.validate(self._namelist) if groups is None: groups = self._namelist.get_group_names() # remove groups that are never in namelist file if "modelio" in groups: groups.remove("modelio") if "seq_maps" in groups: groups.remove("seq_maps") # write namelist file self._namelist.write(namelist_file, groups=groups, sorted_groups=sorted_groups) if data_list_path is not None: self._write_input_files(data_list_path)
# For MCT
[docs] def add_nmlcontents(self, filename, group, append=True, format_="nmlcontents", sorted_groups=True): """ Write only contents of nml group """ self._namelist.write(filename, groups=[group], append=append, format_=format_, sorted_groups=sorted_groups)
[docs] def write_seq_maps(self, filename): """ Write mct out seq_maps.rc""" self._namelist.write(filename, groups=["seq_maps"], format_="rc")
[docs] def write_modelio_file(self, filename): """ Write mct component modelio files""" self._namelist.write(filename, groups=["modelio", "pio_inparm"], format_="nml")
# For NUOPC
[docs] def write_nuopc_modelio_file(self, filename): """ Write nuopc component modelio files""" self._namelist.write(filename, groups=["pio_inparm"], format_="nml")
# For NUOPC
[docs] def write_nuopc_config_file(self, filename, data_list_path=None, skip_comps=None): """ Write the nuopc config file""" self._definition.validate(self._namelist) groups = self._namelist.get_group_names() self._namelist.write(filename, skip_comps=skip_comps, groups=groups, format_='nuopc', sorted_groups=False) if data_list_path is not None: # append to input_data_list file self._write_input_files(data_list_path)