"""Class for generating component namelists."""
# Typically ignore this.
# pylint: disable=invalid-name
# Disable these because this is our standard setup
# pylint: disable=wildcard-import,unused-wildcard-import
import datetime
import re
import hashlib
from CIME.XML.standard_module_setup import *
from CIME.namelist import Namelist, parse, \
character_literal_to_string, string_to_character_literal, \
expand_literal_list, compress_literal_list, merge_literal_lists
from CIME.XML.namelist_definition import NamelistDefinition
from CIME.utils import expect, safe_copy
from CIME.XML.stream import Stream
logger = logging.getLogger(__name__)
_var_ref_re = re.compile(r"\$(\{)?(?P<name>\w+)(?(1)\})")
_ymd_re = re.compile(r"%(?P<digits>[1-9][0-9]*)?y(?P<month>m(?P<day>d)?)?")
_stream_mct_file_template = """<?xml version="1.0"?>
<file id="stream" version="1.0">
<dataSource>
GENERIC
</dataSource>
<domainInfo>
<variableNames>
{domain_varnames}
</variableNames>
<filePath>
{domain_filepath}
</filePath>
<fileNames>
{domain_filenames}
</fileNames>
</domainInfo>
<fieldInfo>
<variableNames>
{data_varnames}
</variableNames>
<filePath>
{data_filepath}
</filePath>
<fileNames>
{data_filenames}
</fileNames>
<offset>
{offset}
</offset>
</fieldInfo>
</file>
"""
[docs]class NamelistGenerator(object):
"""Utility class for generating namelists for a given component."""
_streams_variables = []
#pylint:disable=too-many-arguments
def __init__(self, case, definition_files, files=None):
"""Construct a namelist generator.
Arguments:
`case` - `Case` object corresponding to the current case.
`infiles` - List of files with user namelist options.
`definition_files` - List of XML files containing namelist definitions.
`config` - A dictionary of attributes for matching defaults.
"""
# Save off important information from inputs.
self._case = case
self._din_loc_root = case.get_value('DIN_LOC_ROOT')
# Create definition object - this will validate the xml schema in the definition file
self._definition = NamelistDefinition(definition_files[0], files=files)
# Determine array of _stream_variables from definition object
# This is only applicable to data models
self._streams_namelists = {"streams": []}
self._streams_variables = self._definition.get_per_stream_entries()
for variable in self._streams_variables:
self._streams_namelists[variable] = []
# Create namelist object.
self._namelist = Namelist()
# Define __enter__ and __exit__ so that we can use this as a context manager
def __enter__(self):
return self
def __exit__(self, *_):
return False
[docs] def init_defaults(self, infiles, config, skip_groups=None, skip_entry_loop=False):
"""Return array of names of all definition nodes
"""
# first clean out any settings left over from previous calls
self.new_instance()
self._definition.set_nodes(skip_groups=skip_groups)
# Determine the array of entry nodes that will be acted upon
entry_nodes = self._definition.set_nodes(skip_groups=skip_groups)
# Add attributes to definition object
self._definition.add_attributes(config)
# Parse the infile and create namelist settings for the contents of infile
# this will override all other settings in add_defaults
for file_ in infiles:
# Parse settings in "groupless" mode.
nml_dict = parse(in_file=file_, groupless=True)
# Add groups using the namelist definition.
new_namelist = self._definition.dict_to_namelist(nml_dict, filename=file_)
# Make sure that the input is actually valid.
self._definition.validate(new_namelist, filename=file_)
# Merge into existing settings (earlier settings have precedence
# over later settings).
self._namelist.merge_nl(new_namelist)
if not skip_entry_loop:
for entry in entry_nodes:
self.add_default(self._definition.get(entry, "id"))
return [self._definition.get(entry, "id") for entry in entry_nodes]
[docs] @staticmethod
def quote_string(string):
"""Convert a string to a quoted Fortran literal.
Does nothing if the string appears to be quoted already.
"""
if string == "" or \
(string[0] not in ('"', "'") or string[0] != string[-1]):
string = string_to_character_literal(string)
return string
def _to_python_value(self, name, literals):
"""Transform a literal list as needed for `get_value`."""
var_type, _, var_size, = self._definition.split_type_string(name)
if len(literals) > 0:
values = expand_literal_list(literals)
else:
return ""
for i, scalar in enumerate(values):
if scalar == '':
values[i] = None
elif var_type == 'character':
values[i] = character_literal_to_string(scalar)
if var_size == 1:
return values[0]
else:
return values
def _to_namelist_literals(self, name, values):
"""Transform a literal list as needed for `set_value`.
This is the inverse of `_to_python_value`, except that many of the
changes have potentially already been performed.
"""
var_type, _, var_size, = self._definition.split_type_string(name)
if var_size == 1 and not isinstance(values, list):
values = [values]
for i, scalar in enumerate(values):
if scalar is None:
values[i] = ""
elif var_type == 'character':
expect(not isinstance(scalar, list), name)
values[i] = self.quote_string(scalar)
return compress_literal_list(values)
[docs] def get_value(self, name):
"""Get the current value of a given namelist variable.
Note that the return value of this function is always a string or a list
of strings. E.g. the scalar logical value .false. will be returned as
`".false."`, while an array of two .false. values will be returned as
`[".false.", ".false."]`. Whether or not a value is scalar is determined
by checking the array size in the namelist definition file.
Null values are converted to `None`, and repeated values are expanded,
e.g. `['2*3']` is converted to `['3', '3', '3']`.
For character variables, the value is converted to a Python string (e.g.
quotation marks are removed).
All other literals are returned as the raw string values that will be
written to the namelist.
"""
return self._to_python_value(name, self._namelist.get_value(name))
[docs] def set_value(self, name, value):
"""Set the current value of a given namelist variable.
Usually, you should use `add_default` instead of this function.
The `name` argument is the name of the variable to set, and the `value`
is a list of strings to use as settings. If the variable is scalar, the
list is optional; i.e. a scalar logical can be set using either
`value='.false.'` or `value=['.false.']`. If the variable is of type
character, and the input is missing quotes, quotes will be added
automatically. If `None` is provided in place of a string, this will be
translated to a null value.
Note that this function will overwrite the current value, which may hold
a user-specified setting. Even if `value` is (or contains) a null value,
the old setting for the variable will be thrown out completely.
"""
var_group = self._definition.get_group(name)
literals = self._to_namelist_literals(name, value)
_, _, var_size, = self._definition.split_type_string(name)
self._namelist.set_variable_value(var_group, name, literals, var_size)
[docs] def get_default(self, name, config=None, allow_none=False):
"""Get the value of a variable from the namelist definition file.
The `config` argument is passed through to the underlying
`NamelistDefaults.get_value` call as the `attribute` argument.
The return value of this function is a list of values that were found in
the defaults file. If there is no matching default, this function
returns `None` if `allow_none=True` is passed, otherwise an error is
raised.
Note that we perform some translation of the values, since there are a
few differences between Fortran namelist literals and values in the
defaults file:
1) In the defaults file, whitespace is ignored except within strings, so
the output of this function strips out most whitespace. (This implies
that commas are the only way to separate array elements in the
defaults file.)
2) In the defaults file, quotes around character literals (strings) are
optional, as long as the literal does not contain whitespace, commas,
or (single or double) quotes. If a setting for a character variable
does not seem to have quotes (and is not a null value), this function
will add them.
3) Default values may refer to variables in a case's `env_*.xml` files.
This function replaces references of the form `$VAR` or `${VAR}` with
the value of the variable `VAR` in an env file, if that variable
exists. This behavior is suppressed within single-quoted strings
(similar to parameter expansion in shell scripts).
"""
default = self._definition.get_value_match(name, attributes=config, exact_match=False)
if default is None:
expect(allow_none, "No default value found for {}.".format(name))
return None
default = expand_literal_list(default)
var_type,_,_ = self._definition.split_type_string(name)
for i, scalar in enumerate(default):
# Skip single-quoted strings.
if var_type == 'character' and scalar != '' and \
scalar[0] == scalar[-1] == "'":
continue
match = _var_ref_re.search(scalar)
while match:
env_val = self._case.get_value(match.group('name'))
expect(env_val is not None,
"Namelist default for variable {} refers to unknown XML variable {}.".
format(name, match.group('name')))
scalar = scalar.replace(match.group(0), str(env_val), 1)
match = _var_ref_re.search(scalar)
default[i] = scalar
# Deal with missing quotes.
if var_type == 'character':
for i, scalar in enumerate(default):
# Preserve null values.
if scalar != '':
default[i] = self.quote_string(scalar)
default = self._to_python_value(name, default)
return default
[docs] def get_streams(self):
"""Get a list of all streams used for the current data model mode."""
return self.get_default("streamslist")
[docs] def clean_streams(self):
for variable in self._streams_variables:
self._streams_namelists[variable] = []
self._streams_namelists["streams"] = []
[docs] def new_instance(self):
""" Clean the object just enough to introduce a new instance """
self.clean_streams()
self._namelist.clean_groups()
def _sub_fields(self, varnames):
"""Substitute indicators with given values in a list of fields.
Replace any instance of the following substring indicators with the
appropriate values:
%glc = two-digit GLC elevation class from 00 through glc_nec
The difference between this function and `_sub_paths` is that this
function is intended to be used for variable names (especially from the
`strm_datvar` defaults), whereas `_sub_paths` is intended for use on
input data file paths.
Returns a string.
Example: If `_sub_fields` is called with an array containing two
elements, each of which contains two strings, and glc_nec=3:
foo bar
s2x_Ss_tsrf%glc tsrf%glc
then the returned array will be:
foo bar
s2x_Ss_tsrf00 tsrf00
s2x_Ss_tsrf01 tsrf01
s2x_Ss_tsrf02 tsrf02
s2x_Ss_tsrf03 tsrf03
"""
lines = varnames.split("\n")
new_lines = []
for line in lines:
if not line:
continue
if "%glc" in line:
if self._case.get_value('GLC_NEC') == 0:
glc_nec_indices = []
else:
glc_nec_indices = range(self._case.get_value('GLC_NEC')+1)
for i in glc_nec_indices:
new_lines.append(line.replace("%glc", "{:02d}".format(i)))
else:
new_lines.append(line)
return "\n".join(new_lines)
@staticmethod
def _days_in_month(month, year=1):
"""Number of days in the given month (specified as an int, 1-12).
The `year` argument gives the year for which to request the number of
days, in a Gregorian calendar. Defaults to `1` (not a leap year).
"""
month_start = datetime.date(year, month, 1)
if month == 12:
next_year = year+1
next_month = 1
else:
next_year = year
next_month = month + 1
next_month_start = datetime.date(next_year, next_month, 1)
return (next_month_start - month_start).days
def _sub_paths(self, filenames, year_start, year_end):
"""Substitute indicators with given values in a list of filenames.
Replace any instance of the following substring indicators with the
appropriate values:
%y = year from the range year_start to year_end
%ym = year-month from the range year_start to year_end with all 12
months
%ymd = year-month-day from the range year_start to year_end with
all 12 months
For the date indicators, the year may be prefixed with a number of
digits to use (the default is 4). E.g. `%2ymd` can be used to change the
number of year digits from 4 to 2.
Note that we assume that there is no mixing and matching of date
indicators, i.e. you cannot use `%4ymd` and `%2y` in the same line. Note
also that we use a no-leap calendar, i.e. every month has the same
number of days every year.
The difference between this function and `_sub_fields` is that this
function is intended to be used for file names (especially from the
`strm_datfil` defaults), whereas `_sub_fields` is intended for use on
variable names.
Returns a string (filenames separated by newlines).
"""
lines = [line for line in filenames.split("\n") if line]
new_lines = []
for line in lines:
match = _ymd_re.search(filenames)
if match is None:
new_lines.append(line)
continue
if match.group('digits'):
year_format = "{:0"+match.group('digits')+"d}"
else:
year_format = "{:04d}"
for year in range(year_start, year_end+1):
if match.group('day'):
for month in range(1, 13):
days = self._days_in_month(month)
for day in range(1, days+1):
date_string = (year_format + "-{:02d}-{:02d}").format(year, month, day)
new_line = line.replace(match.group(0), date_string)
new_lines.append(new_line)
elif match.group('month'):
for month in range(1, 13):
date_string = (year_format + "-{:02d}").format(year, month)
new_line = line.replace(match.group(0), date_string)
new_lines.append(new_line)
else:
date_string = year_format.format(year)
new_line = line.replace(match.group(0), date_string)
new_lines.append(new_line)
return "\n".join(new_lines)
@staticmethod
def _add_xml_delimiter(list_to_deliminate, delimiter):
expect(delimiter and not " " in delimiter, "Missing or badly formed delimiter")
pred = "<{}>".format(delimiter)
postd = "</{}>".format(delimiter)
for n,_ in enumerate(list_to_deliminate):
list_to_deliminate[n] = pred + list_to_deliminate[n].strip() + postd
return "\n ".join(list_to_deliminate)
[docs] def create_stream_file_and_update_shr_strdata_nml(self, config, caseroot, #pylint:disable=too-many-locals
stream, stream_path, data_list_path):
"""Write the pseudo-XML file corresponding to a given stream.
Arguments:
`config` - Used to look up namelist defaults. This is used *in addition*
to the `config` used to construct the namelist generator. The
main reason to supply additional configuration options here
is to specify stream-specific settings.
`stream` - Name of the stream.
`stream_path` - Path to write the stream file to.
`data_list_path` - Path of file to append input data information to.
"""
if os.path.exists(stream_path):
os.unlink(stream_path)
user_stream_path = os.path.join(caseroot, "user_"+os.path.basename(stream_path))
# Use the user's stream file, or create one if necessary.
config = config.copy()
config["stream"] = stream
# Stream-specific configuration.
if os.path.exists(user_stream_path):
safe_copy(user_stream_path, stream_path)
strmobj = Stream(infile=stream_path)
domain_filepath = strmobj.get_value("domainInfo/filePath")
data_filepath = strmobj.get_value("fieldInfo/filePath")
domain_filenames = strmobj.get_value("domainInfo/fileNames")
data_filenames = strmobj.get_value("fieldInfo/fileNames")
else:
# Figure out the details of this stream.
if stream in ("prescribed", "copyall"):
# Assume only one file for prescribed mode!
grid_file = self.get_default("strm_grid_file", config)
domain_filepath, domain_filenames = os.path.split(grid_file)
data_file = self.get_default("strm_data_file", config)
data_filepath, data_filenames = os.path.split(data_file)
else:
domain_filepath = self.get_default("strm_domdir", config)
domain_filenames = self.get_default("strm_domfil", config)
data_filepath = self.get_default("strm_datdir", config)
data_filenames = self.get_default("strm_datfil", config)
domain_varnames = self._sub_fields(self.get_default("strm_domvar", config))
data_varnames = self._sub_fields(self.get_default("strm_datvar", config))
offset = self.get_default("strm_offset", config)
year_start = int(self.get_default("strm_year_start", config))
year_end = int(self.get_default("strm_year_end", config))
data_filenames = self._sub_paths(data_filenames, year_start, year_end)
domain_filenames = self._sub_paths(domain_filenames, year_start, year_end)
# Overwrite domain_file if should be set from stream data
if domain_filenames == 'null':
domain_filepath = data_filepath
domain_filenames = data_filenames.splitlines()[0]
stream_file_text = _stream_mct_file_template.format(
domain_varnames=domain_varnames,
domain_filepath=domain_filepath,
domain_filenames=domain_filenames,
data_varnames=data_varnames,
data_filepath=data_filepath,
data_filenames=data_filenames,
offset=offset,
)
with open(stream_path, 'w') as stream_file:
stream_file.write(stream_file_text)
lines_hash = self._get_input_file_hash(data_list_path)
with open(data_list_path, 'a') as input_data_list:
for i, filename in enumerate(domain_filenames.split("\n")):
if filename.strip() == '':
continue
filepath, filename = os.path.split(filename)
if not filepath:
filepath = os.path.join(domain_filepath, filename.strip())
string = "domain{:d} = {}\n".format(i+1, filepath)
hashValue = hashlib.md5(string.rstrip().encode('utf-8')).hexdigest()
if hashValue not in lines_hash:
input_data_list.write(string)
for i, filename in enumerate(data_filenames.split("\n")):
if filename.strip() == '':
continue
filepath = os.path.join(data_filepath, filename.strip())
string = "file{:d} = {}\n".format(i+1, filepath)
hashValue = hashlib.md5(string.rstrip().encode('utf-8')).hexdigest()
if hashValue not in lines_hash:
input_data_list.write(string)
self.update_shr_strdata_nml(config, stream, stream_path)
[docs] def update_shr_strdata_nml(self, config, stream, stream_path):
"""Updates values for the `shr_strdata_nml` namelist group.
This should be done once per stream, and it shouldn't usually be called
directly, since `create_stream_file` calls this method itself.
"""
assert config['stream'] == stream, \
"config stream is {}, but input stream is {}".format(config['stream'], stream)
# Double-check the years for sanity.
year_start = int(self.get_default("strm_year_start", config))
year_end = int(self.get_default("strm_year_end", config))
year_align = int(self.get_default("strm_year_align", config))
expect(year_end >= year_start,
"Stream {} starts at year {:d}, but ends at earlier year {:d}.".format(stream, year_start, year_end))
# Add to streams file.
stream_string = "{} {:d} {:d} {:d}".format(os.path.basename(stream_path),
year_align, year_start,
year_end)
self._streams_namelists["streams"].append(stream_string)
for variable in self._streams_variables:
default = self.get_default(variable, config)
expect(len(default) == 1,
"Stream {} had multiple settings for variable {}.".format(stream, variable))
self._streams_namelists[variable].append(default[0])
[docs] def set_abs_file_path(self, file_path):
"""If `file_path` is relative, make it absolute using `DIN_LOC_ROOT`.
If an absolute path is input, it is returned unchanged.
"""
if os.path.isabs(file_path):
return file_path
else:
fullpath = os.path.join(self._din_loc_root, file_path)
return fullpath
[docs] def add_default(self, name, value=None, ignore_abs_path=None):
"""Add a value for the specified variable to the namelist.
If the specified variable is already defined in the object, the existing
value is preserved. Otherwise, the `value` argument, if provided, will
be used to set the value. If no such value is found, the defaults file
will be consulted. If null values are present in any of the above, the
result will be a merged array of values.
If no value for the variable is found via any of the above, this method
will raise an exception.
"""
# pylint: disable=protected-access
group = self._definition.get_group(name)
# Use this to see if we need to raise an error when nothing is found.
have_value = False
# Check for existing value.
current_literals = self._namelist.get_variable_value(group, name)
# Check for input argument.
if value is not None:
have_value = True
# if compression were to occur, this is where it does
literals = self._to_namelist_literals(name, value)
current_literals = merge_literal_lists(literals, current_literals)
# Check for default value.
default = self.get_default(name, allow_none=True)
if default is not None:
have_value = True
default_literals = self._to_namelist_literals(name, default)
current_literals = merge_literal_lists(default_literals, current_literals)
expect(have_value, "No default value found for {}.".format(name))
# Go through file names and prepend input data root directory for
# absolute pathnames.
var_type, _, var_size = self._definition.split_type_string(name)
if var_type == "character" and ignore_abs_path is None:
var_input_pathname = self._definition.get_input_pathname(name)
if var_input_pathname == 'abs':
current_literals = expand_literal_list(current_literals)
for i, literal in enumerate(current_literals):
if literal == '':
continue
file_path = character_literal_to_string(literal)
# NOTE - these are hard-coded here and a better way is to make these extensible
if file_path == 'UNSET' or file_path == 'idmap' or file_path == 'idmap_ignore' or file_path == 'unset':
continue
if file_path in ('null','create_mesh'):
continue
file_path = self.set_abs_file_path(file_path)
if not os.path.exists(file_path):
logger.warning("File not found: {} = {}, will attempt to download in check_input_data phase".format(name, literal))
current_literals[i] = string_to_character_literal(file_path)
current_literals = compress_literal_list(current_literals)
# Set the new value.
self._namelist.set_variable_value(group, name, current_literals, var_size)
[docs] def create_shr_strdata_nml(self):
"""Set defaults for `shr_strdata_nml` variables other than the variable domainfile """
self.add_default("datamode")
if self.get_value("datamode") != 'NULL':
self.add_default("streams",
value=self._streams_namelists["streams"])
for variable in self._streams_variables:
self.add_default(variable,
value=self._streams_namelists[variable])
[docs] def get_group_variables(self, group_name):
return self._namelist.get_group_variables(group_name)
def _get_input_file_hash(self, data_list_path):
lines_hash = set()
if os.path.isfile(data_list_path):
with open(data_list_path, "r") as input_data_list:
for line in input_data_list:
hashValue = hashlib.md5(line.rstrip().encode('utf-8')).hexdigest()
logger.debug( "Found line {} with hash {}".format(line,hashValue))
lines_hash.add(hashValue)
return lines_hash
def _write_input_files(self, data_list_path):
"""Write input data files to list."""
# append to input_data_list file
lines_hash = self._get_input_file_hash(data_list_path)
with open(data_list_path, "a") as input_data_list:
for group_name in self._namelist.get_group_names():
for variable_name in self._namelist.get_variable_names(group_name):
input_pathname = self._definition.get_node_element_info(variable_name, "input_pathname")
if input_pathname is not None:
# This is where we end up for all variables that are paths
# to input data files.
literals = self._namelist.get_variable_value(group_name, variable_name)
for literal in literals:
file_path = character_literal_to_string(literal)
# NOTE - these are hard-coded here and a better way is to make these extensible
if file_path == 'UNSET' or file_path == 'idmap' or file_path == 'idmap_ignore':
continue
if input_pathname == 'abs':
# No further mangling needed for absolute paths.
# At this point, there are overwrites that should be ignored
if not os.path.isabs(file_path):
continue
else:
pass
elif input_pathname.startswith('rel:'):
# The part past "rel" is the name of a variable that
# this variable specifies its path relative to.
root_var = input_pathname[4:]
root_dir = self.get_value(root_var)
file_path = os.path.join(root_dir, file_path)
else:
expect(False,
"Bad input_pathname value: {}.".format(input_pathname))
# Write to the input data list.
string = "{} = {}".format(variable_name, file_path)
hashValue = hashlib.md5(string.rstrip().encode('utf-8')).hexdigest()
if hashValue not in lines_hash:
logger.debug("Adding line {} with hash {}".format(string,hashValue))
input_data_list.write(string+"\n")
else:
logger.debug("Line already in file {}".format(string))
[docs] def write_output_file(self, namelist_file, data_list_path=None, groups=None, sorted_groups=True):
"""Write out the namelists and input data files.
The `namelist_file` and `modelio_file` are the locations to which the
component and modelio namelists will be written, respectively. The
`data_list_path` argument is the location of the `*.input_data_list`
file, which will have the input data files added to it.
"""
self._definition.validate(self._namelist)
if groups is None:
groups = self._namelist.get_group_names()
# remove groups that are never in namelist file
if "modelio" in groups:
groups.remove("modelio")
if "seq_maps" in groups:
groups.remove("seq_maps")
# write namelist file
self._namelist.write(namelist_file, groups=groups, sorted_groups=sorted_groups)
if data_list_path is not None:
self._write_input_files(data_list_path)
# For MCT
[docs] def add_nmlcontents(self, filename, group, append=True, format_="nmlcontents", sorted_groups=True):
""" Write only contents of nml group """
self._namelist.write(filename, groups=[group], append=append, format_=format_, sorted_groups=sorted_groups)
[docs] def write_seq_maps(self, filename):
""" Write mct out seq_maps.rc"""
self._namelist.write(filename, groups=["seq_maps"], format_="rc")
[docs] def write_modelio_file(self, filename):
""" Write mct component modelio files"""
self._namelist.write(filename, groups=["modelio", "pio_inparm"], format_="nml")
# For NUOPC
[docs] def write_nuopc_modelio_file(self, filename):
""" Write nuopc component modelio files"""
self._namelist.write(filename, groups=["pio_inparm"], format_="nml")
[docs] def write_nuopc_config_file(self, filename, data_list_path=None ):
""" Write the nuopc config file"""
self._definition.validate(self._namelist)
groups = self._namelist.get_group_names()
# write the config file
self._namelist.write_nuopc(filename, groups=groups, sorted_groups=False)
# append to input_data_list file
if data_list_path is not None:
self._write_input_files(data_list_path)