Source code for CIME.case.case_cmpgen_namelists

"""
Library for case.cmpgen_namelists.
case_cmpgen_namelists is a member of Class case from file case.py
"""

from CIME.XML.standard_module_setup import *

from CIME.compare_namelists import is_namelist_file, compare_namelist_files
from CIME.simple_compare import compare_files, compare_runconfigfiles
from CIME.utils import append_status, safe_copy, SharedArea
from CIME.test_status import *

import os, shutil, traceback, stat, glob
from distutils import dir_util

logger = logging.getLogger(__name__)

def _do_full_nl_comp(case, test, compare_name, baseline_root=None):
    test_dir       = case.get_value("CASEROOT")
    casedoc_dir    = os.path.join(test_dir, "CaseDocs")
    baseline_root  = case.get_value("BASELINE_ROOT") if baseline_root is None else baseline_root

    all_match         = True
    baseline_dir      = os.path.join(baseline_root, compare_name, test)
    baseline_casedocs = os.path.join(baseline_dir, "CaseDocs")

    # Start off by comparing everything in CaseDocs except a few arbitrary files (ugh!)
    # TODO: Namelist files should have consistent suffix
    all_items_to_compare = [item for item in glob.glob("{}/*".format(casedoc_dir))\
                            if "README" not in os.path.basename(item)\
                            and not item.endswith("doc")\
                            and not item.endswith("prescribed")\
                            and not os.path.basename(item).startswith(".")]

    comments = "NLCOMP\n"
    for item in all_items_to_compare:
        baseline_counterpart = os.path.join(baseline_casedocs \
                                            if os.path.dirname(item).endswith("CaseDocs") \
                                            else baseline_dir,os.path.basename(item))
        if not os.path.exists(baseline_counterpart):
            comments += "Missing baseline namelist '{}'\n".format(baseline_counterpart)
            all_match = False
        else:
            if item.endswith("runconfig") or item.endswith("runseq"):
                success, current_comments = compare_runconfigfiles(baseline_counterpart, item, test)
            elif is_namelist_file(item):
                success, current_comments = compare_namelist_files(baseline_counterpart, item, test)
            else:
                success, current_comments = compare_files(baseline_counterpart, item, test)

            all_match &= success
            if not success:
                comments += "Comparison failed between '{}' with '{}'\n".format(item, baseline_counterpart)

            comments += current_comments

    logging.info(comments)
    return all_match, comments

def _do_full_nl_gen_impl(case, test, generate_name, baseline_root=None):
    test_dir       = case.get_value("CASEROOT")
    casedoc_dir    = os.path.join(test_dir, "CaseDocs")
    baseline_root  = case.get_value("BASELINE_ROOT") if baseline_root is None else baseline_root

    baseline_dir      = os.path.join(baseline_root, generate_name, test)
    baseline_casedocs = os.path.join(baseline_dir, "CaseDocs")

    if not os.path.isdir(baseline_dir):
        os.makedirs(baseline_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IXOTH | stat.S_IROTH)

    if os.path.isdir(baseline_casedocs):
        shutil.rmtree(baseline_casedocs)

    dir_util.copy_tree(casedoc_dir, baseline_casedocs, preserve_mode=False)

    for item in glob.glob(os.path.join(test_dir, "user_nl*")):
        preexisting_baseline = os.path.join(baseline_dir, os.path.basename(item))
        if (os.path.exists(preexisting_baseline)):
            os.remove(preexisting_baseline)

        safe_copy(item, baseline_dir, preserve_meta=False)

def _do_full_nl_gen(case, test, generate_name, baseline_root=None):
    with SharedArea():
        _do_full_nl_gen_impl(case, test, generate_name, baseline_root=baseline_root)

[docs]def case_cmpgen_namelists(self, compare=False, generate=False, compare_name=None, generate_name=None, baseline_root=None, logfile_name="TestStatus.log"): expect(self.get_value("TEST"), "Only makes sense to run this for a test case") caseroot, casebaseid = self.get_value("CASEROOT"), self.get_value("CASEBASEID") if not compare: compare = self.get_value("COMPARE_BASELINE") if not generate: generate = self.get_value("GENERATE_BASELINE") if not compare and not generate: logging.debug("No namelists compares requested") return True # create namelists for case if they haven't been already casedocs = os.path.join(caseroot, "CaseDocs") if not os.path.exists(os.path.join(casedocs, "drv_in")): self.create_namelists() test_name = casebaseid if casebaseid is not None else self.get_value("CASE") with TestStatus(test_dir=caseroot, test_name=test_name) as ts: try: # Inside this try are where we catch non-fatal errors, IE errors involving # baseline operations which may not directly impact the functioning of the viability of this case if compare and not compare_name: compare_name = self.get_value("BASELINE_NAME_CMP") expect(compare_name, "Was asked to do baseline compare but unable to determine baseline name") logging.info("Comparing namelists with baselines '{}'".format(compare_name)) if generate and not generate_name: generate_name = self.get_value("BASELINE_NAME_GEN") expect(generate_name, "Was asked to do baseline generation but unable to determine baseline name") logging.info("Generating namelists to baselines '{}'".format(generate_name)) success = True output = "" if compare: success, output = _do_full_nl_comp(self, test_name, compare_name, baseline_root) if not success and ts.get_status(RUN_PHASE) is not None: run_warn = \ """NOTE: It is not necessarily safe to compare namelists after RUN phase has completed. Running a case can pollute namelists. The namelists kept in the baselines are pre-RUN namelists.""" output += run_warn logging.info(run_warn) if generate: _do_full_nl_gen(self, test_name, generate_name, baseline_root) except Exception: success = False ts.set_status(NAMELIST_PHASE, TEST_FAIL_STATUS) warn = "Exception during namelist operations:\n{}\n{}".format(sys.exc_info()[1], traceback.format_exc()) output += warn logging.warning(warn) finally: ts.set_status(NAMELIST_PHASE, TEST_PASS_STATUS if success else TEST_FAIL_STATUS) try: append_status(output, logfile_name, caseroot=caseroot) except IOError: pass return success