Source code for CIME.case.case_run

"""
case_run is a member of Class Case
'"""
from CIME.XML.standard_module_setup import *
from CIME.config import Config
from CIME.utils import gzip_existing_file, new_lid
from CIME.utils import run_sub_or_cmd, safe_copy, model_log, CIMEError
from CIME.utils import batch_jobid, is_comp_standalone
from CIME.status import append_status, run_and_log_case_status
from CIME.get_timing import get_timing
from CIME.locked_files import check_lockedfiles

import shutil, time, sys, os, glob

TERMINATION_TEXT = ("HAS ENDED", "END OF MODEL RUN", "SUCCESSFUL TERMINATION")

logger = logging.getLogger(__name__)


###############################################################################
def _pre_run_check(case, lid, skip_pnl=False, da_cycle=0):
    ###############################################################################

    # Pre run initialization code..
    if da_cycle > 0:
        case.create_namelists(component="cpl")
        return

    caseroot = case.get_value("CASEROOT")
    din_loc_root = case.get_value("DIN_LOC_ROOT")
    rundir = case.get_value("RUNDIR")

    if case.get_value("TESTCASE") == "PFS":
        for filename in ("env_mach_pes.xml", "software_environment.txt"):
            fullpath = os.path.join(caseroot, filename)
            safe_copy(fullpath, "{}.{}".format(filename, lid))

    # check for locked files, may impact BUILD_COMPLETE
    skip = None

    if case.get_value("EXTERNAL_WORKFLOW"):
        skip = "env_batch"

    check_lockedfiles(case, skip=skip)

    logger.debug("check_lockedfiles OK")

    build_complete = case.get_value("BUILD_COMPLETE")

    # check that build is done
    expect(
        build_complete,
        "BUILD_COMPLETE is not true\nPlease rebuild the model interactively",
    )
    logger.debug("build complete is {} ".format(build_complete))

    # load the module environment...
    case.load_env(reset=True)

    # create the timing directories, optionally cleaning them if needed.
    if os.path.isdir(os.path.join(rundir, "timing")):
        shutil.rmtree(os.path.join(rundir, "timing"))

    os.makedirs(os.path.join(rundir, "timing", "checkpoints"))

    # This needs to be done everytime the LID changes in order for log files to be set up correctly
    # The following also needs to be called in case a user changes a user_nl_xxx file OR an env_run.xml
    # variable while the job is in the queue
    model_log(
        "e3sm",
        logger,
        "{} NAMELIST CREATION BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
    )
    if skip_pnl:
        case.create_namelists(component="cpl")
    else:
        logger.info("Generating namelists for {}".format(caseroot))
        case.create_namelists()

    model_log(
        "e3sm",
        logger,
        "{} NAMELIST CREATION HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
    )

    logger.info(
        "-------------------------------------------------------------------------"
    )
    logger.info(" - Prestage required restarts into {}".format(rundir))
    logger.info(
        " - Case input data directory (DIN_LOC_ROOT) is {} ".format(din_loc_root)
    )
    logger.info(" - Checking for required input datasets in DIN_LOC_ROOT")
    logger.info(
        "-------------------------------------------------------------------------"
    )


###############################################################################
def _run_model_impl(case, lid, skip_pnl=False, da_cycle=0):
    ###############################################################################

    model_log(
        "e3sm",
        logger,
        "{} PRE_RUN_CHECK BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
    )
    _pre_run_check(case, lid, skip_pnl=skip_pnl, da_cycle=da_cycle)
    model_log(
        "e3sm",
        logger,
        "{} PRE_RUN_CHECK HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
    )

    model = case.get_value("MODEL")

    # Set OMP_NUM_THREADS
    os.environ["OMP_NUM_THREADS"] = str(case.thread_count)

    # Run the model
    cmd = case.get_mpirun_cmd(allow_unresolved_envvars=False)
    logger.info("run command is {} ".format(cmd))

    rundir = case.get_value("RUNDIR")

    # MPIRUN_RETRY_REGEX allows the mpi command to be reattempted if the
    # failure described by that regular expression is matched in the model log
    # case.spare_nodes is overloaded and may also represent the number of
    # retries to attempt if ALLOCATE_SPARE_NODES is False
    retry_run_re = case.get_value("MPIRUN_RETRY_REGEX")
    node_fail_re = case.get_value("NODE_FAIL_REGEX")
    retry_count = 0
    if retry_run_re:
        retry_run_regex = re.compile(re.escape(retry_run_re))
        retry_count = case.get_value("MPIRUN_RETRY_COUNT")
    if node_fail_re:
        node_fail_regex = re.compile(re.escape(node_fail_re))

    is_batch = case.get_value("BATCH_SYSTEM") is not None
    msg_func = None

    if is_batch:
        jobid = batch_jobid()
        msg_func = lambda *args: jobid if jobid else ""

    loop = True
    while loop:
        loop = False

        model_log(
            "e3sm",
            logger,
            "{} SAVE_PRERUN_PROVENANCE BEGINS HERE".format(
                time.strftime("%Y-%m-%d %H:%M:%S")
            ),
        )
        try:
            Config.instance().save_prerun_provenance(case)
        except AttributeError:
            logger.debug("No hook for saving prerun provenance was executed")
        model_log(
            "e3sm",
            logger,
            "{} SAVE_PRERUN_PROVENANCE HAS FINISHED".format(
                time.strftime("%Y-%m-%d %H:%M:%S")
            ),
        )

        model_log(
            "e3sm",
            logger,
            "{} MODEL EXECUTION BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
        )
        run_func = lambda: run_cmd_no_fail(cmd, from_dir=rundir)
        case.flush()

        try:
            run_and_log_case_status(
                run_func,
                "model execution",
                custom_starting_msg_functor=msg_func,
                custom_success_msg_functor=msg_func,
                caseroot=case.get_value("CASEROOT"),
                is_batch=is_batch,
                gitinterface=case._gitinterface,
            )
            cmd_success = True
        except CIMEError:
            cmd_success = False

        # The run will potentially take a very long time. We need to
        # allow the user to xmlchange things in their case.
        #
        # WARNING: All case variables are reloaded after this call to get the
        # new values of any variables that may have been changed by
        # the user during model execution. Thus, any local variables
        # set from case variables before this point may be
        # inconsistent with their latest values in the xml files, so
        # should generally be reloaded (via case.get_value(XXX)) if they are still needed.
        case.read_xml()

        model_log(
            "e3sm",
            logger,
            "{} MODEL EXECUTION HAS FINISHED".format(
                time.strftime("%Y-%m-%d %H:%M:%S")
            ),
        )

        model_logfile = os.path.join(rundir, model + ".log." + lid)
        # Determine if failure was due to a failed node, if so, try to restart
        if retry_run_re or node_fail_re:
            model_logfile = os.path.join(rundir, model + ".log." + lid)
            if os.path.exists(model_logfile):
                num_node_fails = 0
                num_retry_fails = 0
                if node_fail_re:
                    num_node_fails = len(
                        node_fail_regex.findall(open(model_logfile, "r").read())
                    )
                if retry_run_re:
                    num_retry_fails = len(
                        retry_run_regex.findall(open(model_logfile, "r").read())
                    )
                logger.debug(
                    "RETRY: num_retry_fails {} spare_nodes {} retry_count {}".format(
                        num_retry_fails, case.spare_nodes, retry_count
                    )
                )
                if num_node_fails > 0 and case.spare_nodes >= num_node_fails:
                    # We failed due to node failure!
                    logger.warning(
                        "Detected model run failed due to node failure, restarting"
                    )
                    case.spare_nodes -= num_node_fails
                    loop = True
                    case.set_value(
                        "CONTINUE_RUN", case.get_value("RESUBMIT_SETS_CONTINUE_RUN")
                    )
                elif num_retry_fails > 0 and retry_count >= num_retry_fails:
                    logger.warning("Detected model run failed, restarting")
                    retry_count -= 1
                    loop = True

                if loop:
                    # Archive the last consistent set of restart files and restore them
                    if case.get_value("DOUT_S"):
                        case.case_st_archive(resubmit=False)
                        case.restore_from_archive()

                    lid = new_lid(case=case)
                    case.create_namelists()

        if not cmd_success and not loop:
            # We failed and we're not restarting
            expect(
                False,
                "RUN FAIL: Command '{}' failed\nSee log file for details: {}".format(
                    cmd, model_logfile
                ),
            )

    model_log(
        "e3sm",
        logger,
        "{} POST_RUN_CHECK BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
    )
    _post_run_check(case, lid)
    model_log(
        "e3sm",
        logger,
        "{} POST_RUN_CHECK HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
    )

    return lid


###############################################################################
def _run_model(case, lid, skip_pnl=False, da_cycle=0):
    ###############################################################################
    functor = lambda: _run_model_impl(case, lid, skip_pnl=skip_pnl, da_cycle=da_cycle)

    is_batch = case.get_value("BATCH_SYSTEM") is not None
    msg_func = None

    if is_batch:
        jobid = batch_jobid()
        msg_func = lambda *args: jobid if jobid is not None else ""

    return run_and_log_case_status(
        functor,
        "case.run",
        custom_starting_msg_functor=msg_func,
        custom_success_msg_functor=msg_func,
        caseroot=case.get_value("CASEROOT"),
        is_batch=is_batch,
        gitinterface=case._gitinterface,
    )


###############################################################################
def _post_run_check(case, lid):
    ###############################################################################

    rundir = case.get_value("RUNDIR")
    driver = case.get_value("COMP_INTERFACE")

    comp_standalone, model = is_comp_standalone(case)

    if driver == "nuopc":
        if comp_standalone:
            file_prefix = model
        else:
            file_prefix = "med"
    else:
        file_prefix = "cpl"

    cpl_ninst = 1
    if case.get_value("MULTI_DRIVER"):
        cpl_ninst = case.get_value("NINST_MAX")
    cpl_logs = []

    if cpl_ninst > 1:
        for inst in range(cpl_ninst):
            cpl_logs.append(
                os.path.join(rundir, file_prefix + "_%04d.log." % (inst + 1) + lid)
            )
            if driver == "nuopc" and comp_standalone:
                cpl_logs.append(
                    os.path.join(rundir, "med_%04d.log." % (inst + 1) + lid)
                )
    else:
        cpl_logs = [os.path.join(rundir, file_prefix + ".log." + lid)]
        if driver == "nuopc" and comp_standalone:
            cpl_logs.append(os.path.join(rundir, "med.log." + lid))
    cpl_logfile = cpl_logs[0]
    # find the last model.log and cpl.log
    model_logfile = os.path.join(rundir, model + ".log." + lid)
    if not os.path.isfile(model_logfile):
        expect(False, "Model did not complete, no {} log file ".format(model_logfile))
    elif os.stat(model_logfile).st_size == 0:
        expect(False, "Run FAILED")
    else:
        count_ok = 0
        for cpl_logfile in cpl_logs:
            if not os.path.isfile(cpl_logfile):
                break
            with open(cpl_logfile, "r") as fd:
                logfile = fd.read()
                if any([x in logfile for x in TERMINATION_TEXT]):
                    count_ok += 1
        if count_ok < cpl_ninst:
            expect(False, "Model did not complete - see {} \n ".format(cpl_logfile))


###############################################################################
def _save_logs(case, lid):
    ###############################################################################
    rundir = case.get_value("RUNDIR")
    logfiles = glob.glob(os.path.join(rundir, "*.log.{}".format(lid)))
    for logfile in logfiles:
        if os.path.isfile(logfile):
            gzip_existing_file(logfile)


######################################################################################
def _resubmit_check(case):
    ###############################################################################
    """
    check to see if we need to do resubmission from this particular job,
    Note that Mira requires special logic
    """
    dout_s = case.get_value("DOUT_S")
    logger.warning("dout_s {} ".format(dout_s))
    mach = case.get_value("MACH")
    logger.warning("mach {} ".format(mach))
    resubmit_num = case.get_value("RESUBMIT")
    logger.warning("resubmit_num {}".format(resubmit_num))
    # If dout_s is True than short-term archiving handles the resubmit
    # If dout_s is True and machine is mira submit the st_archive script
    resubmit = False
    if not dout_s and resubmit_num > 0:
        resubmit = True
    elif dout_s and mach == "mira":
        caseroot = case.get_value("CASEROOT")
        cimeroot = case.get_value("CIMEROOT")
        cmd = "ssh cooleylogin1 'cd {case}; CIMEROOT={root} ./case.submit {case} --job case.st_archive'".format(
            case=caseroot, root=cimeroot
        )
        run_cmd(cmd, verbose=True)

    if resubmit:
        job = case.get_primary_job()

        case.submit(job=job, resubmit=True)

    logger.debug("resubmit after check is {}".format(resubmit))


###############################################################################
def _do_external(script_name, caseroot, rundir, lid, prefix):
    ###############################################################################
    expect(
        os.path.isfile(script_name), "External script {} not found".format(script_name)
    )
    filename = "{}.external.log.{}".format(prefix, lid)
    outfile = os.path.join(rundir, filename)
    append_status("Starting script {}".format(script_name), "CaseStatus")
    run_sub_or_cmd(
        script_name,
        [caseroot],
        (os.path.basename(script_name).split(".", 1))[0],
        [caseroot],
        logfile=outfile,
    )  # For sub, use case?
    append_status("Completed script {}".format(script_name), "CaseStatus")


###############################################################################
def _do_data_assimilation(da_script, caseroot, cycle, lid, rundir):
    ###############################################################################
    expect(
        os.path.isfile(da_script),
        "Data Assimilation script {} not found".format(da_script),
    )
    filename = "da.log.{}".format(lid)
    outfile = os.path.join(rundir, filename)
    run_sub_or_cmd(
        da_script,
        [caseroot, cycle],
        os.path.basename(da_script),
        [caseroot, cycle],
        logfile=outfile,
    )  # For sub, use case?


###############################################################################
[docs] def case_run(self, skip_pnl=False, set_continue_run=False, submit_resubmits=False): ############################################################################### model_log( "e3sm", logger, "{} CASE.RUN BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) # Set up the run, run the model, do the postrun steps # set up the LID lid = new_lid(case=self) prerun_script = self.get_value("PRERUN_SCRIPT") if prerun_script: model_log( "e3sm", logger, "{} PRERUN_SCRIPT BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) self.flush() _do_external( prerun_script, self.get_value("CASEROOT"), self.get_value("RUNDIR"), lid, prefix="prerun", ) self.read_xml() model_log( "e3sm", logger, "{} PRERUN_SCRIPT HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) # We might need to tweak these if we want to allow the user to change them data_assimilation_cycles = self.get_value("DATA_ASSIMILATION_CYCLES") data_assimilation_script = self.get_value("DATA_ASSIMILATION_SCRIPT") data_assimilation = ( data_assimilation_cycles > 0 and len(data_assimilation_script) > 0 and os.path.isfile(data_assimilation_script) ) for cycle in range(data_assimilation_cycles): # After the first DA cycle, runs are restart runs if cycle > 0: lid = new_lid() self.set_value("CONTINUE_RUN", self.get_value("RESUBMIT_SETS_CONTINUE_RUN")) # WARNING: All case variables are reloaded during run_model to get # new values of any variables that may have been changed by # the user during model execution. Thus, any local variables # set from case variables before this point may be # inconsistent with their latest values in the xml files, so # should generally be reloaded (via case.get_value(XXX)) if they are still needed. model_log( "e3sm", logger, "{} RUN_MODEL BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) lid = _run_model(self, lid, skip_pnl, da_cycle=cycle) model_log( "e3sm", logger, "{} RUN_MODEL HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) if self.get_value("CHECK_TIMING") or self.get_value("SAVE_TIMING"): model_log( "e3sm", logger, "{} GET_TIMING BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) get_timing(self, lid) # Run the getTiming script model_log( "e3sm", logger, "{} GET_TIMING HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) if data_assimilation: model_log( "e3sm", logger, "{} DO_DATA_ASSIMILATION BEGINS HERE".format( time.strftime("%Y-%m-%d %H:%M:%S") ), ) self.flush() _do_data_assimilation( data_assimilation_script, self.get_value("CASEROOT"), cycle, lid, self.get_value("RUNDIR"), ) self.read_xml() model_log( "e3sm", logger, "{} DO_DATA_ASSIMILATION HAS FINISHED".format( time.strftime("%Y-%m-%d %H:%M:%S") ), ) _save_logs(self, lid) # Copy log files back to caseroot model_log( "e3sm", logger, "{} SAVE_POSTRUN_PROVENANCE BEGINS HERE".format( time.strftime("%Y-%m-%d %H:%M:%S") ), ) try: Config.instance().save_postrun_provenance(self, lid) except AttributeError: logger.debug("No hook for saving postrun provenance was executed") model_log( "e3sm", logger, "{} SAVE_POSTRUN_PROVENANCE HAS FINISHED".format( time.strftime("%Y-%m-%d %H:%M:%S") ), ) postrun_script = self.get_value("POSTRUN_SCRIPT") if postrun_script: model_log( "e3sm", logger, "{} POSTRUN_SCRIPT BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) self.flush() _do_external( postrun_script, self.get_value("CASEROOT"), self.get_value("RUNDIR"), lid, prefix="postrun", ) self.read_xml() _save_logs(self, lid) model_log( "e3sm", logger, "{} POSTRUN_SCRIPT HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) if set_continue_run: self.set_value("CONTINUE_RUN", self.get_value("RESUBMIT_SETS_CONTINUE_RUN")) external_workflow = self.get_value("EXTERNAL_WORKFLOW") if not external_workflow: logger.warning("check for resubmit") logger.debug("submit_resubmits is {}".format(submit_resubmits)) if submit_resubmits: _resubmit_check(self) model_log( "e3sm", logger, "{} CASE.RUN HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")), ) return True