"""
case_run is a member of Class Case
'"""
from CIME.XML.standard_module_setup import *
from CIME.config import Config
from CIME.utils import gzip_existing_file, new_lid
from CIME.utils import run_sub_or_cmd, safe_copy, model_log, CIMEError
from CIME.utils import batch_jobid, is_comp_standalone
from CIME.status import append_status, run_and_log_case_status
from CIME.get_timing import get_timing
from CIME.locked_files import check_lockedfiles
import shutil, time, sys, os, glob
TERMINATION_TEXT = ("HAS ENDED", "END OF MODEL RUN", "SUCCESSFUL TERMINATION")
logger = logging.getLogger(__name__)
###############################################################################
def _pre_run_check(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
# Pre run initialization code..
if da_cycle > 0:
case.create_namelists(component="cpl")
return
caseroot = case.get_value("CASEROOT")
din_loc_root = case.get_value("DIN_LOC_ROOT")
rundir = case.get_value("RUNDIR")
if case.get_value("TESTCASE") == "PFS":
for filename in ("env_mach_pes.xml", "software_environment.txt"):
fullpath = os.path.join(caseroot, filename)
safe_copy(fullpath, "{}.{}".format(filename, lid))
# check for locked files, may impact BUILD_COMPLETE
skip = None
if case.get_value("EXTERNAL_WORKFLOW"):
skip = "env_batch"
check_lockedfiles(case, skip=skip)
logger.debug("check_lockedfiles OK")
build_complete = case.get_value("BUILD_COMPLETE")
# check that build is done
expect(
build_complete,
"BUILD_COMPLETE is not true\nPlease rebuild the model interactively",
)
logger.debug("build complete is {} ".format(build_complete))
# load the module environment...
case.load_env(reset=True)
# create the timing directories, optionally cleaning them if needed.
if os.path.isdir(os.path.join(rundir, "timing")):
shutil.rmtree(os.path.join(rundir, "timing"))
os.makedirs(os.path.join(rundir, "timing", "checkpoints"))
# This needs to be done everytime the LID changes in order for log files to be set up correctly
# The following also needs to be called in case a user changes a user_nl_xxx file OR an env_run.xml
# variable while the job is in the queue
model_log(
"e3sm",
logger,
"{} NAMELIST CREATION BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
if skip_pnl:
case.create_namelists(component="cpl")
else:
logger.info("Generating namelists for {}".format(caseroot))
case.create_namelists()
model_log(
"e3sm",
logger,
"{} NAMELIST CREATION HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
logger.info(
"-------------------------------------------------------------------------"
)
logger.info(" - Prestage required restarts into {}".format(rundir))
logger.info(
" - Case input data directory (DIN_LOC_ROOT) is {} ".format(din_loc_root)
)
logger.info(" - Checking for required input datasets in DIN_LOC_ROOT")
logger.info(
"-------------------------------------------------------------------------"
)
###############################################################################
def _run_model_impl(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
model_log(
"e3sm",
logger,
"{} PRE_RUN_CHECK BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
_pre_run_check(case, lid, skip_pnl=skip_pnl, da_cycle=da_cycle)
model_log(
"e3sm",
logger,
"{} PRE_RUN_CHECK HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
model = case.get_value("MODEL")
# Set OMP_NUM_THREADS
os.environ["OMP_NUM_THREADS"] = str(case.thread_count)
# Run the model
cmd = case.get_mpirun_cmd(allow_unresolved_envvars=False)
logger.info("run command is {} ".format(cmd))
rundir = case.get_value("RUNDIR")
# MPIRUN_RETRY_REGEX allows the mpi command to be reattempted if the
# failure described by that regular expression is matched in the model log
# case.spare_nodes is overloaded and may also represent the number of
# retries to attempt if ALLOCATE_SPARE_NODES is False
retry_run_re = case.get_value("MPIRUN_RETRY_REGEX")
node_fail_re = case.get_value("NODE_FAIL_REGEX")
retry_count = 0
if retry_run_re:
retry_run_regex = re.compile(re.escape(retry_run_re))
retry_count = case.get_value("MPIRUN_RETRY_COUNT")
if node_fail_re:
node_fail_regex = re.compile(re.escape(node_fail_re))
is_batch = case.get_value("BATCH_SYSTEM") is not None
msg_func = None
if is_batch:
jobid = batch_jobid()
msg_func = lambda *args: jobid if jobid else ""
loop = True
while loop:
loop = False
model_log(
"e3sm",
logger,
"{} SAVE_PRERUN_PROVENANCE BEGINS HERE".format(
time.strftime("%Y-%m-%d %H:%M:%S")
),
)
try:
Config.instance().save_prerun_provenance(case)
except AttributeError:
logger.debug("No hook for saving prerun provenance was executed")
model_log(
"e3sm",
logger,
"{} SAVE_PRERUN_PROVENANCE HAS FINISHED".format(
time.strftime("%Y-%m-%d %H:%M:%S")
),
)
model_log(
"e3sm",
logger,
"{} MODEL EXECUTION BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
run_func = lambda: run_cmd_no_fail(cmd, from_dir=rundir)
case.flush()
try:
run_and_log_case_status(
run_func,
"model execution",
custom_starting_msg_functor=msg_func,
custom_success_msg_functor=msg_func,
caseroot=case.get_value("CASEROOT"),
is_batch=is_batch,
gitinterface=case._gitinterface,
)
cmd_success = True
except CIMEError:
cmd_success = False
# The run will potentially take a very long time. We need to
# allow the user to xmlchange things in their case.
#
# WARNING: All case variables are reloaded after this call to get the
# new values of any variables that may have been changed by
# the user during model execution. Thus, any local variables
# set from case variables before this point may be
# inconsistent with their latest values in the xml files, so
# should generally be reloaded (via case.get_value(XXX)) if they are still needed.
case.read_xml()
model_log(
"e3sm",
logger,
"{} MODEL EXECUTION HAS FINISHED".format(
time.strftime("%Y-%m-%d %H:%M:%S")
),
)
model_logfile = os.path.join(rundir, model + ".log." + lid)
# Determine if failure was due to a failed node, if so, try to restart
if retry_run_re or node_fail_re:
model_logfile = os.path.join(rundir, model + ".log." + lid)
if os.path.exists(model_logfile):
num_node_fails = 0
num_retry_fails = 0
if node_fail_re:
num_node_fails = len(
node_fail_regex.findall(open(model_logfile, "r").read())
)
if retry_run_re:
num_retry_fails = len(
retry_run_regex.findall(open(model_logfile, "r").read())
)
logger.debug(
"RETRY: num_retry_fails {} spare_nodes {} retry_count {}".format(
num_retry_fails, case.spare_nodes, retry_count
)
)
if num_node_fails > 0 and case.spare_nodes >= num_node_fails:
# We failed due to node failure!
logger.warning(
"Detected model run failed due to node failure, restarting"
)
case.spare_nodes -= num_node_fails
loop = True
case.set_value(
"CONTINUE_RUN", case.get_value("RESUBMIT_SETS_CONTINUE_RUN")
)
elif num_retry_fails > 0 and retry_count >= num_retry_fails:
logger.warning("Detected model run failed, restarting")
retry_count -= 1
loop = True
if loop:
# Archive the last consistent set of restart files and restore them
if case.get_value("DOUT_S"):
case.case_st_archive(resubmit=False)
case.restore_from_archive()
lid = new_lid(case=case)
case.create_namelists()
if not cmd_success and not loop:
# We failed and we're not restarting
expect(
False,
"RUN FAIL: Command '{}' failed\nSee log file for details: {}".format(
cmd, model_logfile
),
)
model_log(
"e3sm",
logger,
"{} POST_RUN_CHECK BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
_post_run_check(case, lid)
model_log(
"e3sm",
logger,
"{} POST_RUN_CHECK HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
return lid
###############################################################################
def _run_model(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
functor = lambda: _run_model_impl(case, lid, skip_pnl=skip_pnl, da_cycle=da_cycle)
is_batch = case.get_value("BATCH_SYSTEM") is not None
msg_func = None
if is_batch:
jobid = batch_jobid()
msg_func = lambda *args: jobid if jobid is not None else ""
return run_and_log_case_status(
functor,
"case.run",
custom_starting_msg_functor=msg_func,
custom_success_msg_functor=msg_func,
caseroot=case.get_value("CASEROOT"),
is_batch=is_batch,
gitinterface=case._gitinterface,
)
###############################################################################
def _post_run_check(case, lid):
###############################################################################
rundir = case.get_value("RUNDIR")
driver = case.get_value("COMP_INTERFACE")
comp_standalone, model = is_comp_standalone(case)
if driver == "nuopc":
if comp_standalone:
file_prefix = model
else:
file_prefix = "med"
else:
file_prefix = "cpl"
cpl_ninst = 1
if case.get_value("MULTI_DRIVER"):
cpl_ninst = case.get_value("NINST_MAX")
cpl_logs = []
if cpl_ninst > 1:
for inst in range(cpl_ninst):
cpl_logs.append(
os.path.join(rundir, file_prefix + "_%04d.log." % (inst + 1) + lid)
)
if driver == "nuopc" and comp_standalone:
cpl_logs.append(
os.path.join(rundir, "med_%04d.log." % (inst + 1) + lid)
)
else:
cpl_logs = [os.path.join(rundir, file_prefix + ".log." + lid)]
if driver == "nuopc" and comp_standalone:
cpl_logs.append(os.path.join(rundir, "med.log." + lid))
cpl_logfile = cpl_logs[0]
# find the last model.log and cpl.log
model_logfile = os.path.join(rundir, model + ".log." + lid)
if not os.path.isfile(model_logfile):
expect(False, "Model did not complete, no {} log file ".format(model_logfile))
elif os.stat(model_logfile).st_size == 0:
expect(False, "Run FAILED")
else:
count_ok = 0
for cpl_logfile in cpl_logs:
if not os.path.isfile(cpl_logfile):
break
with open(cpl_logfile, "r") as fd:
logfile = fd.read()
if any([x in logfile for x in TERMINATION_TEXT]):
count_ok += 1
if count_ok < cpl_ninst:
expect(False, "Model did not complete - see {} \n ".format(cpl_logfile))
###############################################################################
def _save_logs(case, lid):
###############################################################################
rundir = case.get_value("RUNDIR")
logfiles = glob.glob(os.path.join(rundir, "*.log.{}".format(lid)))
for logfile in logfiles:
if os.path.isfile(logfile):
gzip_existing_file(logfile)
######################################################################################
def _resubmit_check(case):
###############################################################################
"""
check to see if we need to do resubmission from this particular job,
Note that Mira requires special logic
"""
dout_s = case.get_value("DOUT_S")
logger.warning("dout_s {} ".format(dout_s))
mach = case.get_value("MACH")
logger.warning("mach {} ".format(mach))
resubmit_num = case.get_value("RESUBMIT")
logger.warning("resubmit_num {}".format(resubmit_num))
# If dout_s is True than short-term archiving handles the resubmit
# If dout_s is True and machine is mira submit the st_archive script
resubmit = False
if not dout_s and resubmit_num > 0:
resubmit = True
elif dout_s and mach == "mira":
caseroot = case.get_value("CASEROOT")
cimeroot = case.get_value("CIMEROOT")
cmd = "ssh cooleylogin1 'cd {case}; CIMEROOT={root} ./case.submit {case} --job case.st_archive'".format(
case=caseroot, root=cimeroot
)
run_cmd(cmd, verbose=True)
if resubmit:
job = case.get_primary_job()
case.submit(job=job, resubmit=True)
logger.debug("resubmit after check is {}".format(resubmit))
###############################################################################
def _do_external(script_name, caseroot, rundir, lid, prefix):
###############################################################################
expect(
os.path.isfile(script_name), "External script {} not found".format(script_name)
)
filename = "{}.external.log.{}".format(prefix, lid)
outfile = os.path.join(rundir, filename)
append_status("Starting script {}".format(script_name), "CaseStatus")
run_sub_or_cmd(
script_name,
[caseroot],
(os.path.basename(script_name).split(".", 1))[0],
[caseroot],
logfile=outfile,
) # For sub, use case?
append_status("Completed script {}".format(script_name), "CaseStatus")
###############################################################################
def _do_data_assimilation(da_script, caseroot, cycle, lid, rundir):
###############################################################################
expect(
os.path.isfile(da_script),
"Data Assimilation script {} not found".format(da_script),
)
filename = "da.log.{}".format(lid)
outfile = os.path.join(rundir, filename)
run_sub_or_cmd(
da_script,
[caseroot, cycle],
os.path.basename(da_script),
[caseroot, cycle],
logfile=outfile,
) # For sub, use case?
###############################################################################
[docs]
def case_run(self, skip_pnl=False, set_continue_run=False, submit_resubmits=False):
###############################################################################
model_log(
"e3sm",
logger,
"{} CASE.RUN BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
# Set up the run, run the model, do the postrun steps
# set up the LID
lid = new_lid(case=self)
prerun_script = self.get_value("PRERUN_SCRIPT")
if prerun_script:
model_log(
"e3sm",
logger,
"{} PRERUN_SCRIPT BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
self.flush()
_do_external(
prerun_script,
self.get_value("CASEROOT"),
self.get_value("RUNDIR"),
lid,
prefix="prerun",
)
self.read_xml()
model_log(
"e3sm",
logger,
"{} PRERUN_SCRIPT HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
# We might need to tweak these if we want to allow the user to change them
data_assimilation_cycles = self.get_value("DATA_ASSIMILATION_CYCLES")
data_assimilation_script = self.get_value("DATA_ASSIMILATION_SCRIPT")
data_assimilation = (
data_assimilation_cycles > 0
and len(data_assimilation_script) > 0
and os.path.isfile(data_assimilation_script)
)
for cycle in range(data_assimilation_cycles):
# After the first DA cycle, runs are restart runs
if cycle > 0:
lid = new_lid()
self.set_value("CONTINUE_RUN", self.get_value("RESUBMIT_SETS_CONTINUE_RUN"))
# WARNING: All case variables are reloaded during run_model to get
# new values of any variables that may have been changed by
# the user during model execution. Thus, any local variables
# set from case variables before this point may be
# inconsistent with their latest values in the xml files, so
# should generally be reloaded (via case.get_value(XXX)) if they are still needed.
model_log(
"e3sm",
logger,
"{} RUN_MODEL BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
lid = _run_model(self, lid, skip_pnl, da_cycle=cycle)
model_log(
"e3sm",
logger,
"{} RUN_MODEL HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
if self.get_value("CHECK_TIMING") or self.get_value("SAVE_TIMING"):
model_log(
"e3sm",
logger,
"{} GET_TIMING BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
get_timing(self, lid) # Run the getTiming script
model_log(
"e3sm",
logger,
"{} GET_TIMING HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
if data_assimilation:
model_log(
"e3sm",
logger,
"{} DO_DATA_ASSIMILATION BEGINS HERE".format(
time.strftime("%Y-%m-%d %H:%M:%S")
),
)
self.flush()
_do_data_assimilation(
data_assimilation_script,
self.get_value("CASEROOT"),
cycle,
lid,
self.get_value("RUNDIR"),
)
self.read_xml()
model_log(
"e3sm",
logger,
"{} DO_DATA_ASSIMILATION HAS FINISHED".format(
time.strftime("%Y-%m-%d %H:%M:%S")
),
)
_save_logs(self, lid) # Copy log files back to caseroot
model_log(
"e3sm",
logger,
"{} SAVE_POSTRUN_PROVENANCE BEGINS HERE".format(
time.strftime("%Y-%m-%d %H:%M:%S")
),
)
try:
Config.instance().save_postrun_provenance(self, lid)
except AttributeError:
logger.debug("No hook for saving postrun provenance was executed")
model_log(
"e3sm",
logger,
"{} SAVE_POSTRUN_PROVENANCE HAS FINISHED".format(
time.strftime("%Y-%m-%d %H:%M:%S")
),
)
postrun_script = self.get_value("POSTRUN_SCRIPT")
if postrun_script:
model_log(
"e3sm",
logger,
"{} POSTRUN_SCRIPT BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
self.flush()
_do_external(
postrun_script,
self.get_value("CASEROOT"),
self.get_value("RUNDIR"),
lid,
prefix="postrun",
)
self.read_xml()
_save_logs(self, lid)
model_log(
"e3sm",
logger,
"{} POSTRUN_SCRIPT HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
if set_continue_run:
self.set_value("CONTINUE_RUN", self.get_value("RESUBMIT_SETS_CONTINUE_RUN"))
external_workflow = self.get_value("EXTERNAL_WORKFLOW")
if not external_workflow:
logger.warning("check for resubmit")
logger.debug("submit_resubmits is {}".format(submit_resubmits))
if submit_resubmits:
_resubmit_check(self)
model_log(
"e3sm",
logger,
"{} CASE.RUN HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")),
)
return True