Source code for CIME.nmlgen

"""Class for generating component namelists."""

# Typically ignore this.
# pylint: disable=invalid-name

# Disable these because this is our standard setup
# pylint: disable=wildcard-import,unused-wildcard-import

import datetime
import re
import hashlib

from CIME.XML.standard_module_setup import *
from CIME.namelist import (
    Namelist,
    parse,
    character_literal_to_string,
    string_to_character_literal,
    expand_literal_list,
    compress_literal_list,
    merge_literal_lists,
)
from CIME.XML.namelist_definition import NamelistDefinition
from CIME.utils import expect, safe_copy
from CIME.XML.stream import Stream
from CIME.XML.grids import GRID_SEP

logger = logging.getLogger(__name__)

_var_ref_re = re.compile(r"\$(\{)?(?P<name>\w+)(?(1)\})")

_ymd_re = re.compile(r"%(?P<digits>[1-9][0-9]*)?y(?P<month>m(?P<day>d)?)?")

_stream_mct_file_template = """<?xml version="1.0"?>
<file id="stream" version="1.0">
<dataSource>
   GENERIC
</dataSource>
<domainInfo>
  <variableNames>
     {domain_varnames}
  </variableNames>
  <filePath>
     {domain_filepath}
  </filePath>
  <fileNames>
     {domain_filenames}
  </fileNames>
</domainInfo>
<fieldInfo>
   <variableNames>
     {data_varnames}
   </variableNames>
   <filePath>
     {data_filepath}
   </filePath>
   <fileNames>
    {data_filenames}
   </fileNames>
   <offset>
      {offset}
   </offset>
</fieldInfo>
</file>
"""


[docs] class NamelistGenerator(object): """Utility class for generating namelists for a given component.""" _streams_variables = [] # pylint:disable=too-many-arguments def __init__(self, case, definition_files, files=None): """Construct a namelist generator. Arguments: `case` - `Case` object corresponding to the current case. `infiles` - List of files with user namelist options. `definition_files` - List of XML files containing namelist definitions. `config` - A dictionary of attributes for matching defaults. """ # Save off important information from inputs. self._case = case self._din_loc_root = case.get_value("DIN_LOC_ROOT") # Create definition object - this will validate the xml schema in the definition file self._definition = NamelistDefinition(definition_files[0], files=files) # Determine array of _stream_variables from definition object # This is only applicable to data models self._streams_namelists = {"streams": []} self._streams_variables = self._definition.get_per_stream_entries() for variable in self._streams_variables: self._streams_namelists[variable] = [] # Create namelist object. self._namelist = Namelist() # entries for which we should potentially call add_default (variables that do not # set skip_default_entry) self._default_nodes = [] # Define __enter__ and __exit__ so that we can use this as a context manager def __enter__(self): return self def __exit__(self, *_): return False
[docs] def init_defaults( self, infiles, config, skip_groups=None, skip_entry_loop=False, skip_default_for_groups=None, set_group_name=None, ): """Return array of names of all definition nodes infiles should be a list of file paths, each one giving namelist settings that take precedence over the default values. Often there will be only one file in this list. If there are multiple files, earlier files take precedence over later files. If skip_default_for_groups is provided, it should be a list of namelist group names; the add_default call will not be done for any variables in these groups. This is often paired with later conditional calls to add_defaults_for_group. """ if skip_default_for_groups is None: skip_default_for_groups = [] # first clean out any settings left over from previous calls self.new_instance() # Determine the array of entry nodes that will be acted upon self._default_nodes = self._definition.set_nodes(skip_groups=skip_groups) # Add attributes to definition object self._definition.add_attributes(config) # Parse the infile and create namelist settings for the contents of infile # this will override all other settings in add_defaults for file_ in infiles: # Parse settings in "groupless" mode. nml_dict = parse(in_file=file_, groupless=True) # Add groups using the namelist definition. new_namelist = self._definition.dict_to_namelist(nml_dict, filename=file_) # Make sure that the input is actually valid. self._definition.validate(new_namelist, filename=file_) # Merge into existing settings (earlier settings have precedence # over later settings). self._namelist.merge_nl(new_namelist) if not skip_entry_loop: for entry in self._default_nodes: if set_group_name: group_name = set_group_name else: group_name = self._definition.get_group_name(entry) if not group_name in skip_default_for_groups: self.add_default(self._definition.get(entry, "id")) return [self._definition.get(entry, "id") for entry in self._default_nodes]
[docs] def rename_group(self, group, newgroup): """Pass through to namelist definition""" return self._definition.rename_group(group, newgroup)
[docs] def add_defaults_for_group(self, group): """Call add_default for namelist variables in the given group This still skips variables that have attributes of skip_default_entry or per_stream_entry. This must be called after init_defaults. It is often paired with use of skip_default_for_groups in the init_defaults call. """ for entry in self._default_nodes: group_name = self._definition.get_group_name(entry) if group_name == group: self.add_default(self._definition.get(entry, "id"))
[docs] def confirm_group_is_empty(self, group_name, errmsg): """Confirms that no values have been added to the given group If any values HAVE been added to this group, aborts with the given error message. This is often paired with use of skip_default_for_groups in the init_defaults call and add_defaults_for_group, as in: if nmlgen.get_value("enable_frac_overrides") == ".true.": nmlgen.add_defaults_for_group("glc_override_nml") else: nmlgen.confirm_empty("glc_override_nml", "some message") Args: group_name: string - name of namelist group errmsg: string - error message to print if group is not empty """ variables_in_group = self._namelist.get_variable_names(group_name) fullmsg = "{}\nOffending variables: {}".format(errmsg, variables_in_group) expect(len(variables_in_group) == 0, fullmsg)
[docs] @staticmethod def quote_string(string): """Convert a string to a quoted Fortran literal. Does nothing if the string appears to be quoted already. """ if string == "" or (string[0] not in ('"', "'") or string[0] != string[-1]): string = string_to_character_literal(string) return string
def _to_python_value(self, name, literals): """Transform a literal list as needed for `get_value`.""" ( var_type, _, var_size, ) = self._definition.split_type_string(name) if len(literals) > 0 and literals[0] is not None: values = expand_literal_list(literals) else: return "" for i, scalar in enumerate(values): if scalar == "": values[i] = None elif var_type == "character": values[i] = character_literal_to_string(scalar) if var_size == 1: return values[0] else: return values def _to_namelist_literals(self, name, values): """Transform a literal list as needed for `set_value`. This is the inverse of `_to_python_value`, except that many of the changes have potentially already been performed. """ ( var_type, _, var_size, ) = self._definition.split_type_string(name) if var_size == 1 and not isinstance(values, list): values = [values] for i, scalar in enumerate(values): if scalar is None: values[i] = "" elif var_type == "character": expect(not isinstance(scalar, list), name) values[i] = self.quote_string(scalar) return compress_literal_list(values)
[docs] def get_value(self, name): """Get the current value of a given namelist variable. Note that the return value of this function is always a string or a list of strings. E.g. the scalar logical value .false. will be returned as `".false."`, while an array of two .false. values will be returned as `[".false.", ".false."]`. Whether or not a value is scalar is determined by checking the array size in the namelist definition file. Null values are converted to `None`, and repeated values are expanded, e.g. `['2*3']` is converted to `['3', '3', '3']`. For character variables, the value is converted to a Python string (e.g. quotation marks are removed). All other literals are returned as the raw string values that will be written to the namelist. """ return self._to_python_value(name, self._namelist.get_value(name))
[docs] def set_value(self, name, value): """Set the current value of a given namelist variable. Usually, you should use `add_default` instead of this function. The `name` argument is the name of the variable to set, and the `value` is a list of strings to use as settings. If the variable is scalar, the list is optional; i.e. a scalar logical can be set using either `value='.false.'` or `value=['.false.']`. If the variable is of type character, and the input is missing quotes, quotes will be added automatically. If `None` is provided in place of a string, this will be translated to a null value. Note that this function will overwrite the current value, which may hold a user-specified setting. Even if `value` is (or contains) a null value, the old setting for the variable will be thrown out completely. """ var_group = self._definition.get_group(name) literals = self._to_namelist_literals(name, value) ( _, _, var_size, ) = self._definition.split_type_string(name) if len(literals) > 0 and literals[0] is not None: self._namelist.set_variable_value(var_group, name, literals, var_size)
[docs] def get_default(self, name, config=None, allow_none=False): """Get the value of a variable from the namelist definition file. The `config` argument is passed through to the underlying `NamelistDefaults.get_value` call as the `attribute` argument. The return value of this function is a list of values that were found in the defaults file. If there is no matching default, this function returns `None` if `allow_none=True` is passed, otherwise an error is raised. Note that we perform some translation of the values, since there are a few differences between Fortran namelist literals and values in the defaults file: 1) In the defaults file, whitespace is ignored except within strings, so the output of this function strips out most whitespace. (This implies that commas are the only way to separate array elements in the defaults file.) 2) In the defaults file, quotes around character literals (strings) are optional, as long as the literal does not contain whitespace, commas, or (single or double) quotes. If a setting for a character variable does not seem to have quotes (and is not a null value), this function will add them. 3) Default values may refer to variables in a case's `env_*.xml` files. This function replaces references of the form `$VAR` or `${VAR}` with the value of the variable `VAR` in an env file, if that variable exists. This behavior is suppressed within single-quoted strings (similar to parameter expansion in shell scripts). """ default = self._definition.get_value_match( name, attributes=config, exact_match=False ) if default is None: expect(allow_none, "No default value found for {}.".format(name)) return None default = expand_literal_list(default) var_type, _, _ = self._definition.split_type_string(name) for i, scalar in enumerate(default): # Skip single-quoted strings. if ( var_type == "character" and scalar != "" and scalar[0] == scalar[-1] == "'" ): continue match = _var_ref_re.search(scalar) while match: env_val = self._case.get_value(match.group("name")) if env_val is not None: scalar = scalar.replace(match.group(0), str(env_val), 1) match = _var_ref_re.search(scalar) else: scalar = None logger.warning( "Namelist default for variable {} refers to unknown XML variable {}.".format( name, match.group("name") ) ) match = None default[i] = scalar # Deal with missing quotes. if var_type == "character": for i, scalar in enumerate(default): # Preserve null values. if scalar != "": default[i] = self.quote_string(scalar) default = self._to_python_value(name, default) return default
[docs] def get_streams(self): """Get a list of all streams used for the current data model mode.""" return self.get_default("streamslist")
[docs] def clean_streams(self): for variable in self._streams_variables: self._streams_namelists[variable] = [] self._streams_namelists["streams"] = []
[docs] def new_instance(self): """Clean the object just enough to introduce a new instance""" self.clean_streams() self._namelist.clean_groups()
def _sub_fields(self, varnames): """Substitute indicators with given values in a list of fields. Replace any instance of the following substring indicators with the appropriate values: %glc = two-digit GLC elevation class from 00 through glc_nec The difference between this function and `_sub_paths` is that this function is intended to be used for variable names (especially from the `strm_datvar` defaults), whereas `_sub_paths` is intended for use on input data file paths. Returns a string. Example: If `_sub_fields` is called with an array containing two elements, each of which contains two strings, and glc_nec=3: foo bar s2x_Ss_tsrf%glc tsrf%glc then the returned array will be: foo bar s2x_Ss_tsrf00 tsrf00 s2x_Ss_tsrf01 tsrf01 s2x_Ss_tsrf02 tsrf02 s2x_Ss_tsrf03 tsrf03 """ lines = varnames.split("\n") new_lines = [] for line in lines: if not line: continue if "%glc" in line: if self._case.get_value("GLC_NEC") == 0: glc_nec_indices = [] else: glc_nec_indices = range(self._case.get_value("GLC_NEC") + 1) for i in glc_nec_indices: new_lines.append(line.replace("%glc", "{:02d}".format(i))) else: new_lines.append(line) return "\n".join(new_lines) @staticmethod def _days_in_month(month, year=1): """Number of days in the given month (specified as an int, 1-12). The `year` argument gives the year for which to request the number of days, in a Gregorian calendar. Defaults to `1` (not a leap year). """ month_start = datetime.date(year, month, 1) if month == 12: next_year = year + 1 next_month = 1 else: next_year = year next_month = month + 1 next_month_start = datetime.date(next_year, next_month, 1) return (next_month_start - month_start).days def _sub_paths(self, filenames, year_start, year_end): """Substitute indicators with given values in a list of filenames. Replace any instance of the following substring indicators with the appropriate values: %y = year from the range year_start to year_end %ym = year-month from the range year_start to year_end with all 12 months %ymd = year-month-day from the range year_start to year_end with all 12 months For the date indicators, the year may be prefixed with a number of digits to use (the default is 4). E.g. `%2ymd` can be used to change the number of year digits from 4 to 2. Note that we assume that there is no mixing and matching of date indicators, i.e. you cannot use `%4ymd` and `%2y` in the same line. Note also that we use a no-leap calendar, i.e. every month has the same number of days every year. The difference between this function and `_sub_fields` is that this function is intended to be used for file names (especially from the `strm_datfil` defaults), whereas `_sub_fields` is intended for use on variable names. Returns a string (filenames separated by newlines). """ lines = [line for line in filenames.split("\n") if line] new_lines = [] for line in lines: match = _ymd_re.search(filenames) if match is None: new_lines.append(line) continue if match.group("digits"): year_format = "{:0" + match.group("digits") + "d}" else: year_format = "{:04d}" for year in range(year_start, year_end + 1): if match.group("day"): for month in range(1, 13): days = self._days_in_month(month) for day in range(1, days + 1): date_string = (year_format + "-{:02d}-{:02d}").format( year, month, day ) new_line = line.replace(match.group(0), date_string) new_lines.append(new_line) elif match.group("month"): for month in range(1, 13): date_string = (year_format + "-{:02d}").format(year, month) new_line = line.replace(match.group(0), date_string) new_lines.append(new_line) else: date_string = year_format.format(year) new_line = line.replace(match.group(0), date_string) new_lines.append(new_line) return "\n".join(new_lines) @staticmethod def _add_xml_delimiter(list_to_deliminate, delimiter): expect(delimiter and not " " in delimiter, "Missing or badly formed delimiter") pred = "<{}>".format(delimiter) postd = "</{}>".format(delimiter) for n, _ in enumerate(list_to_deliminate): list_to_deliminate[n] = pred + list_to_deliminate[n].strip() + postd return "\n ".join(list_to_deliminate)
[docs] def create_stream_file_and_update_shr_strdata_nml( self, config, caseroot, # pylint:disable=too-many-locals stream, stream_path, data_list_path, ): """Write the pseudo-XML file corresponding to a given stream. Arguments: `config` - Used to look up namelist defaults. This is used *in addition* to the `config` used to construct the namelist generator. The main reason to supply additional configuration options here is to specify stream-specific settings. `stream` - Name of the stream. `stream_path` - Path to write the stream file to. `data_list_path` - Path of file to append input data information to. """ if os.path.exists(stream_path): os.unlink(stream_path) user_stream_path = os.path.join( caseroot, "user_" + os.path.basename(stream_path) ) # Use the user's stream file, or create one if necessary. config = config.copy() config["stream"] = stream # Stream-specific configuration. if os.path.exists(user_stream_path): safe_copy(user_stream_path, stream_path) strmobj = Stream(infile=stream_path) domain_filepath = strmobj.get_value("domainInfo/filePath") data_filepath = strmobj.get_value("fieldInfo/filePath") domain_filenames = strmobj.get_value("domainInfo/fileNames") data_filenames = strmobj.get_value("fieldInfo/fileNames") else: # Figure out the details of this stream. if stream in ("prescribed", "copyall"): # Assume only one file for prescribed mode! grid_file = self.get_default("strm_grid_file", config) domain_filepath, domain_filenames = os.path.split(grid_file) data_file = self.get_default("strm_data_file", config) data_filepath, data_filenames = os.path.split(data_file) else: domain_filepath = self.get_default("strm_domdir", config) domain_filenames = self.get_default("strm_domfil", config) data_filepath = self.get_default("strm_datdir", config) data_filenames = self.get_default("strm_datfil", config) domain_varnames = self._sub_fields(self.get_default("strm_domvar", config)) data_varnames = self._sub_fields(self.get_default("strm_datvar", config)) offset = self.get_default("strm_offset", config) year_start = int(self.get_default("strm_year_start", config)) year_end = int(self.get_default("strm_year_end", config)) data_filenames = self._sub_paths(data_filenames, year_start, year_end) domain_filenames = self._sub_paths(domain_filenames, year_start, year_end) # Overwrite domain_file if should be set from stream data if domain_filenames == "null": domain_filepath = data_filepath domain_filenames = data_filenames.splitlines()[0] stream_file_text = _stream_mct_file_template.format( domain_varnames=domain_varnames, domain_filepath=domain_filepath, domain_filenames=domain_filenames, data_varnames=data_varnames, data_filepath=data_filepath, data_filenames=data_filenames, offset=offset, ) with open(stream_path, "w") as stream_file: stream_file.write(stream_file_text) lines_hash = self._get_input_file_hash(data_list_path) with open(data_list_path, "a") as input_data_list: for i, filename in enumerate(domain_filenames.split("\n")): if filename.strip() == "": continue filepath, filename = os.path.split(filename) if not filepath: filepath = os.path.join(domain_filepath, filename.strip()) string = "domain{:d} = {}\n".format(i + 1, filepath) hashValue = hashlib.md5(string.rstrip().encode("utf-8")).hexdigest() if hashValue not in lines_hash: input_data_list.write(string) for i, filename in enumerate(data_filenames.split("\n")): if filename.strip() == "": continue filepath = os.path.join(data_filepath, filename.strip()) string = "file{:d} = {}\n".format(i + 1, filepath) hashValue = hashlib.md5(string.rstrip().encode("utf-8")).hexdigest() if hashValue not in lines_hash: input_data_list.write(string) self.update_shr_strdata_nml(config, stream, stream_path)
[docs] def update_shr_strdata_nml(self, config, stream, stream_path): """Updates values for the `shr_strdata_nml` namelist group. This should be done once per stream, and it shouldn't usually be called directly, since `create_stream_file` calls this method itself. """ assert ( config["stream"] == stream ), "config stream is {}, but input stream is {}".format( config["stream"], stream ) # Double-check the years for sanity. year_start = int(self.get_default("strm_year_start", config)) year_end = int(self.get_default("strm_year_end", config)) year_align = int(self.get_default("strm_year_align", config)) expect( year_end >= year_start, "Stream {} starts at year {:d}, but ends at earlier year {:d}.".format( stream, year_start, year_end ), ) # Add to streams file. stream_string = "{} {:d} {:d} {:d}".format( os.path.basename(stream_path), year_align, year_start, year_end ) self._streams_namelists["streams"].append(stream_string) for variable in self._streams_variables: default = self.get_default(variable, config) expect( len(default) == 1, "Stream {} had multiple settings for variable {}.".format( stream, variable ), ) self._streams_namelists[variable].append(default[0])
[docs] def set_abs_file_path(self, file_path): """If `file_path` is relative, make it absolute using `DIN_LOC_ROOT`. If an absolute path is input, it is returned unchanged. """ if os.path.isabs(file_path): return file_path else: fullpath = os.path.join(self._din_loc_root, file_path) return fullpath
[docs] def add_default(self, name, value=None, ignore_abs_path=None): """Add a value for the specified variable to the namelist. If the specified variable is already defined in the object, the existing value is preserved. Otherwise, the `value` argument, if provided, will be used to set the value. If no such value is found, the defaults file will be consulted. If null values are present in any of the above, the result will be a merged array of values. If no value for the variable is found via any of the above, this method will raise an exception. """ # pylint: disable=protected-access group = self._definition.get_group(name) # Use this to see if we need to raise an error when nothing is found. have_value = False # Check for existing value. current_literals = self._namelist.get_variable_value(group, name) if current_literals != [""]: have_value = True # Check for input argument. if value is not None: have_value = True # if compression were to occur, this is where it does literals = self._to_namelist_literals(name, value) current_literals = merge_literal_lists(literals, current_literals) # Check for default value. default = self.get_default(name, allow_none=True) if default is not None: have_value = True default_literals = self._to_namelist_literals(name, default) current_literals = merge_literal_lists(default_literals, current_literals) expect( have_value, "No default value found for {} with attributes {}.".format( name, self._definition.get_attributes() ), ) # Go through file names and prepend input data root directory for # absolute pathnames. var_type, _, var_size = self._definition.split_type_string(name) if var_type == "character" and ignore_abs_path is None: var_input_pathname = self._definition.get_input_pathname(name) if var_input_pathname == "abs": current_literals = expand_literal_list(current_literals) for i, literal in enumerate(current_literals): if literal == "": continue file_path = character_literal_to_string(literal) abs_file_path = self._convert_to_abs_file_path(file_path, name) current_literals[i] = string_to_character_literal(abs_file_path) current_literals = compress_literal_list(current_literals) # Set the new value. self._namelist.set_variable_value(group, name, current_literals, var_size)
def _convert_to_abs_file_path(self, file_path, name): """Convert the given file_path to an abs file path and return the result It's possible that file_path actually contains multiple files delimited by GRID_SEP. (This is the case when a component has multiple grids, and so has a file for each grid.) In this case, we split it on GRID_SEP and handle each separated portion as a separate file, then return a new GRID_SEP-delimited string. """ abs_file_paths = [] # In most cases, the list created by the following split will only contain a # single element, but this split is needed to handle grid-related files for # components with multiple grids (e.g., GLC). for one_file_path in file_path.split(GRID_SEP): # NOTE - these are hard-coded here and a better way is to make these extensible if ( one_file_path == "UNSET" or one_file_path == "idmap" or one_file_path == "idmap_ignore" or one_file_path == "unset" ): abs_file_paths.append(one_file_path) elif one_file_path in ("null", "create_mesh"): abs_file_paths.append(one_file_path) else: one_abs_file_path = self.set_abs_file_path(one_file_path) if not os.path.exists(one_abs_file_path): logger.warning( "File not found: {} = {}, will attempt to download in check_input_data phase".format( name, one_abs_file_path ) ) abs_file_paths.append(one_abs_file_path) return GRID_SEP.join(abs_file_paths)
[docs] def create_shr_strdata_nml(self): """Set defaults for `shr_strdata_nml` variables other than the variable domainfile""" self.add_default("datamode") if self.get_value("datamode") != "NULL": self.add_default("streams", value=self._streams_namelists["streams"]) for variable in self._streams_variables: self.add_default(variable, value=self._streams_namelists[variable])
[docs] def get_group_variables(self, group_name): return self._namelist.get_group_variables(group_name)
def _get_input_file_hash(self, data_list_path): lines_hash = set() if os.path.isfile(data_list_path): with open(data_list_path, "r") as input_data_list: for line in input_data_list: hashValue = hashlib.md5(line.rstrip().encode("utf-8")).hexdigest() logger.debug("Found line {} with hash {}".format(line, hashValue)) lines_hash.add(hashValue) return lines_hash def _write_input_files(self, data_list_path): """Write input data files to list.""" # append to input_data_list file lines_hash = self._get_input_file_hash(data_list_path) with open(data_list_path, "a") as input_data_list: for group_name in self._namelist.get_group_names(): for variable_name in self._namelist.get_variable_names(group_name): input_pathname = self._definition.get_node_element_info( variable_name, "input_pathname" ) if input_pathname is not None: # This is where we end up for all variables that are paths # to input data files. literals = self._namelist.get_variable_value( group_name, variable_name ) for literal in literals: file_path = character_literal_to_string(literal) self._add_file_to_input_data_list( input_data_list=input_data_list, variable_name=variable_name, file_path=file_path, input_pathname=input_pathname, lines_hash=lines_hash, ) def _add_file_to_input_data_list( self, input_data_list, variable_name, file_path, input_pathname, lines_hash ): """Add one file to the input data list, if needed It's possible that file_path actually contains multiple files delimited by GRID_SEP. (This is the case when a component has multiple grids, and so has a file for each grid.) In this case, we split it on GRID_SEP and handle each separated portion as a separate file. Args: - input_data_list: file handle - variable_name (string): name of variable to add - file_path (string): path to file - input_pathname (string): whether this is an absolute or relative path - lines_hash (set): set of hashes of lines already in the given input data list """ for one_file_path in file_path.split(GRID_SEP): # NOTE - these are hard-coded here and a better way is to make these extensible if ( one_file_path == "UNSET" or one_file_path == "idmap" or one_file_path == "idmap_ignore" ): continue if input_pathname == "abs": # No further mangling needed for absolute paths. # At this point, there are overwrites that should be ignored if not os.path.isabs(one_file_path): continue else: pass elif input_pathname.startswith("rel:"): # The part past "rel" is the name of a variable that # this variable specifies its path relative to. root_var = input_pathname[4:] root_dir = self.get_value(root_var) one_file_path = os.path.join(root_dir, one_file_path) else: expect(False, "Bad input_pathname value: {}.".format(input_pathname)) # Write to the input data list. # # Note that the same variable name is repeated for each file. This currently # seems okay for check_input_data, but if it becomes a problem, we could # change this, e.g., appending an index to the end of variable_name. string = "{} = {}".format(variable_name, one_file_path) hashValue = hashlib.md5(string.rstrip().encode("utf-8")).hexdigest() if hashValue not in lines_hash: logger.debug("Adding line {} with hash {}".format(string, hashValue)) input_data_list.write(string + "\n") else: logger.debug("Line already in file {}".format(string))
[docs] def write_output_file( self, namelist_file, data_list_path=None, groups=None, sorted_groups=True ): """Write out the namelists and input data files. The `namelist_file` and `modelio_file` are the locations to which the component and modelio namelists will be written, respectively. The `data_list_path` argument is the location of the `*.input_data_list` file, which will have the input data files added to it. """ self._definition.validate(self._namelist) if groups is None: groups = self._namelist.get_group_names() # remove groups that are never in namelist file if "modelio" in groups: groups.remove("modelio") if "seq_maps" in groups: groups.remove("seq_maps") # write namelist file self._namelist.write(namelist_file, groups=groups, sorted_groups=sorted_groups) if data_list_path is not None: self._write_input_files(data_list_path)
# For MCT
[docs] def add_nmlcontents( self, filename, group, append=True, format_="nmlcontents", sorted_groups=True ): """Write only contents of nml group""" self._namelist.write( filename, groups=[group], append=append, format_=format_, sorted_groups=sorted_groups, )
[docs] def write_seq_maps(self, filename): """Write mct out seq_maps.rc""" self._namelist.write(filename, groups=["seq_maps"], format_="rc")
[docs] def write_modelio_file(self, filename): """Write mct component modelio files""" self._namelist.write(filename, groups=["modelio", "pio_inparm"], format_="nml")
# For NUOPC
[docs] def write_nuopc_modelio_file(self, filename): """Write nuopc component modelio files""" self._namelist.write(filename, groups=["pio_inparm"], format_="nml")
[docs] def write_nuopc_config_file( self, filename, data_list_path=None, sorted_groups=False ): """Write the nuopc config file""" self._definition.validate(self._namelist) groups = self._namelist.get_group_names() # write the config file self._namelist.write_nuopc(filename, groups=groups, sorted_groups=sorted_groups) # append to input_data_list file if data_list_path is not None: self._write_input_files(data_list_path)