Source code for CIME.case.case_cmpgen_namelists

"""
Library for case.cmpgen_namelists.
case_cmpgen_namelists is a member of Class case from file case.py
"""

from CIME.XML.standard_module_setup import *

from CIME.compare_namelists import is_namelist_file, compare_namelist_files
from CIME.simple_compare import compare_files, compare_runconfigfiles
from CIME.utils import safe_copy, SharedArea
from CIME.status import append_status
from CIME.test_status import *

import os, shutil, traceback, stat, glob

logger = logging.getLogger(__name__)


def _do_full_nl_comp(case, test, compare_name, baseline_root=None):
    test_dir = case.get_value("CASEROOT")
    casedoc_dir = os.path.join(test_dir, "CaseDocs")
    baseline_root = (
        case.get_value("BASELINE_ROOT") if baseline_root is None else baseline_root
    )

    all_match = True
    baseline_dir = os.path.join(baseline_root, compare_name, test)
    baseline_casedocs = os.path.join(baseline_dir, "CaseDocs")

    # Start off by comparing everything in CaseDocs except a few arbitrary files (ugh!)
    # TODO: Namelist files should have consistent suffix
    all_items_to_compare = [
        item
        for item in glob.glob("{}/*".format(casedoc_dir))
        if "README" not in os.path.basename(item)
        and not item.endswith("doc")
        and not item.endswith("prescribed")
        and not os.path.basename(item).startswith(".")
    ]

    comments = "NLCOMP\n"
    for item in all_items_to_compare:
        baseline_counterpart = os.path.join(
            baseline_casedocs
            if os.path.dirname(item).endswith("CaseDocs")
            else baseline_dir,
            os.path.basename(item),
        )
        if not os.path.exists(baseline_counterpart):
            comments += "Missing baseline namelist '{}'\n".format(baseline_counterpart)
            all_match = False
        else:
            if item.endswith("runconfig") or item.endswith("runseq"):
                success, current_comments = compare_runconfigfiles(
                    baseline_counterpart, item, test
                )
            elif is_namelist_file(item):
                success, current_comments = compare_namelist_files(
                    baseline_counterpart, item, test
                )
            else:
                success, current_comments = compare_files(
                    baseline_counterpart, item, test
                )

            all_match &= success
            if not success:
                comments += "Comparison failed between '{}' with '{}'\n".format(
                    item, baseline_counterpart
                )

            comments += current_comments

    logging.info(comments)
    return all_match, comments


def _do_full_nl_gen_impl(case, test, generate_name, baseline_root=None):
    test_dir = case.get_value("CASEROOT")
    casedoc_dir = os.path.join(test_dir, "CaseDocs")
    baseline_root = (
        case.get_value("BASELINE_ROOT") if baseline_root is None else baseline_root
    )

    baseline_dir = os.path.join(baseline_root, generate_name, test)
    baseline_casedocs = os.path.join(baseline_dir, "CaseDocs")

    if not os.path.isdir(baseline_dir):
        os.makedirs(
            baseline_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IXOTH | stat.S_IROTH
        )

    if os.path.isdir(baseline_casedocs):
        shutil.rmtree(baseline_casedocs)

    shutil.copytree(casedoc_dir, baseline_casedocs)

    for item in glob.glob(os.path.join(test_dir, "user_nl*")):
        preexisting_baseline = os.path.join(baseline_dir, os.path.basename(item))
        if os.path.exists(preexisting_baseline):
            os.remove(preexisting_baseline)

        safe_copy(item, baseline_dir, preserve_meta=False)


def _do_full_nl_gen(case, test, generate_name, baseline_root=None):
    with SharedArea():
        _do_full_nl_gen_impl(case, test, generate_name, baseline_root=baseline_root)


[docs] def case_cmpgen_namelists( self, compare=False, generate=False, compare_name=None, generate_name=None, baseline_root=None, logfile_name="TestStatus.log", ): expect(self.get_value("TEST"), "Only makes sense to run this for a test case") caseroot, casebaseid = self.get_value("CASEROOT"), self.get_value("CASEBASEID") if not compare: compare = self.get_value("COMPARE_BASELINE") if not generate: generate = self.get_value("GENERATE_BASELINE") if not compare and not generate: logging.debug("No namelists compares requested") return True # create namelists for case if they haven't been already casedocs = os.path.join(caseroot, "CaseDocs") if not os.path.exists(os.path.join(casedocs, "drv_in")): self.create_namelists() test_name = casebaseid if casebaseid is not None else self.get_value("CASE") with TestStatus(test_dir=caseroot, test_name=test_name) as ts: try: # Inside this try are where we catch non-fatal errors, IE errors involving # baseline operations which may not directly impact the functioning of the viability of this case if compare and not compare_name: compare_name = self.get_value("BASELINE_NAME_CMP") expect( compare_name, "Was asked to do baseline compare but unable to determine baseline name", ) logging.info( "Comparing namelists with baselines '{}'".format(compare_name) ) if generate and not generate_name: generate_name = self.get_value("BASELINE_NAME_GEN") expect( generate_name, "Was asked to do baseline generation but unable to determine baseline name", ) logging.info( "Generating namelists to baselines '{}'".format(generate_name) ) success = True output = "" if compare: success, output = _do_full_nl_comp( self, test_name, compare_name, baseline_root ) if not success and ts.get_status(RUN_PHASE) is not None: run_warn = """NOTE: It is not necessarily safe to compare namelists after RUN phase has completed. Running a case can pollute namelists. The namelists kept in the baselines are pre-RUN namelists.""" output += run_warn logging.info(run_warn) if generate: _do_full_nl_gen(self, test_name, generate_name, baseline_root) except Exception: success = False ts.set_status(NAMELIST_PHASE, TEST_FAIL_STATUS) warn = "Exception during namelist operations:\n{}\n{}".format( sys.exc_info()[1], traceback.format_exc() ) output += warn logging.warning(warn) finally: ts.set_status( NAMELIST_PHASE, TEST_PASS_STATUS if success else TEST_FAIL_STATUS ) try: append_status( output, logfile_name, caseroot=caseroot, ) except IOError: pass return success