"""
case_run is a member of Class Case
'"""
from CIME.XML.standard_module_setup import *
from CIME.utils import gzip_existing_file, new_lid, run_and_log_case_status
from CIME.utils import run_sub_or_cmd, append_status, safe_copy
from CIME.get_timing import get_timing
from CIME.provenance import save_prerun_provenance, save_postrun_provenance
import shutil, time, sys, os, glob
logger = logging.getLogger(__name__)
###############################################################################
def _pre_run_check(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
# Pre run initialization code..
if da_cycle > 0:
case.create_namelists(component='cpl')
return
caseroot = case.get_value("CASEROOT")
din_loc_root = case.get_value("DIN_LOC_ROOT")
batchsubmit = case.get_value("BATCHSUBMIT")
rundir = case.get_value("RUNDIR")
build_complete = case.get_value("BUILD_COMPLETE")
if case.get_value("TESTCASE") == "PFS":
env_mach_pes = os.path.join(caseroot,"env_mach_pes.xml")
safe_copy(env_mach_pes,"{}.{}".format(env_mach_pes, lid))
# check for locked files.
skip = None
if case.get_value("EXTERNAL_WORKFLOW"):
skip = "env_batch"
case.check_lockedfiles(skip=skip)
logger.debug("check_lockedfiles OK")
# check that build is done
expect(build_complete,
"BUILD_COMPLETE is not true\nPlease rebuild the model interactively")
logger.debug("build complete is {} ".format(build_complete))
# load the module environment...
case.load_env(reset=True)
# set environment variables
if batchsubmit is None or len(batchsubmit) == 0:
os.environ["LBQUERY"] = "FALSE"
os.environ["BATCHQUERY"] = "undefined"
elif batchsubmit == 'UNSET':
os.environ["LBQUERY"] = "FALSE"
os.environ["BATCHQUERY"] = "undefined"
else:
os.environ["LBQUERY"] = "TRUE"
# create the timing directories, optionally cleaning them if needed.
if os.path.isdir(os.path.join(rundir, "timing")):
shutil.rmtree(os.path.join(rundir, "timing"))
os.makedirs(os.path.join(rundir, "timing", "checkpoints"))
# This needs to be done everytime the LID changes in order for log files to be set up correctly
# The following also needs to be called in case a user changes a user_nl_xxx file OR an env_run.xml
# variable while the job is in the queue
if skip_pnl:
case.create_namelists(component='cpl')
else:
logger.info("Generating namelists for {}".format(caseroot))
case.create_namelists()
logger.info("-------------------------------------------------------------------------")
logger.info(" - Prestage required restarts into {}".format(rundir))
logger.info(" - Case input data directory (DIN_LOC_ROOT) is {} ".format(din_loc_root))
logger.info(" - Checking for required input datasets in DIN_LOC_ROOT")
logger.info("-------------------------------------------------------------------------")
###############################################################################
def _run_model_impl(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
_pre_run_check(case, lid, skip_pnl=skip_pnl, da_cycle=da_cycle)
model = case.get_value("MODEL")
# Set OMP_NUM_THREADS
os.environ["OMP_NUM_THREADS"] = str(case.thread_count)
# Run the model
logger.info("{} MODEL EXECUTION BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
cmd = case.get_mpirun_cmd(allow_unresolved_envvars=False)
logger.info("run command is {} ".format(cmd))
rundir = case.get_value("RUNDIR")
loop = True
# MPIRUN_RETRY_REGEX allows the mpi command to be reattempted if the
# failure described by that regular expression is matched in the model log
# case.spare_nodes is overloaded and may also represent the number of
# retries to attempt if ALLOCATE_SPARE_NODES is False
retry_run_re = case.get_value("MPIRUN_RETRY_REGEX")
node_fail_re = case.get_value("NODE_FAIL_REGEX")
retry_count = 0
if retry_run_re:
retry_run_regex = re.compile(re.escape(retry_run_re))
retry_count = case.get_value("MPIRUN_RETRY_COUNT")
if node_fail_re:
node_fail_regex = re.compile(re.escape(node_fail_re))
while loop:
loop = False
save_prerun_provenance(case)
run_func = lambda: run_cmd(cmd, from_dir=rundir)[0]
stat = run_and_log_case_status(run_func, "model execution", caseroot=case.get_value("CASEROOT"))
model_logfile = os.path.join(rundir, model + ".log." + lid)
# Determine if failure was due to a failed node, if so, try to restart
if retry_run_re or node_fail_re:
model_logfile = os.path.join(rundir, model + ".log." + lid)
if os.path.exists(model_logfile):
num_node_fails=0
num_retry_fails=0
if node_fail_re:
num_node_fails = len(node_fail_regex.findall(open(model_logfile, 'r').read()))
if retry_run_re:
num_retry_fails = len(retry_run_regex.findall(open(model_logfile, 'r').read()))
logger.debug ("RETRY: num_retry_fails {} spare_nodes {} retry_count {}".
format(num_retry_fails, case.spare_nodes, retry_count))
if num_node_fails > 0 and case.spare_nodes >= num_node_fails:
# We failed due to node failure!
logger.warning("Detected model run failed due to node failure, restarting")
case.spare_nodes -= num_node_fails
loop = True
case.set_value("CONTINUE_RUN",
case.get_value("RESUBMIT_SETS_CONTINUE_RUN"))
elif num_retry_fails > 0 and retry_count >= num_retry_fails:
logger.warning("Detected model run failed, restarting")
retry_count -= 1
loop = True
if loop:
# Archive the last consistent set of restart files and restore them
if case.get_value("DOUT_S"):
case.case_st_archive(resubmit=False)
case.restore_from_archive()
lid = new_lid()
case.create_namelists()
if stat != 0 and not loop:
# We failed and we're not restarting
expect(False, "RUN FAIL: Command '{}' failed\nSee log file for details: {}".format(cmd, model_logfile))
logger.info("{} MODEL EXECUTION HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
_post_run_check(case, lid)
return lid
###############################################################################
def _run_model(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
functor = lambda: _run_model_impl(case, lid, skip_pnl=skip_pnl, da_cycle=da_cycle)
return run_and_log_case_status(functor, "case.run", caseroot=case.get_value("CASEROOT"))
###############################################################################
def _post_run_check(case, lid):
###############################################################################
rundir = case.get_value("RUNDIR")
model = case.get_value("MODEL")
cpl_ninst = 1
if case.get_value("MULTI_DRIVER"):
cpl_ninst = case.get_value("NINST_MAX")
cpl_logs = []
if cpl_ninst > 1:
for inst in range(cpl_ninst):
cpl_logs.append(os.path.join(rundir, "cpl_%04d.log." % (inst+1) + lid))
else:
cpl_logs = [os.path.join(rundir, "cpl" + ".log." + lid)]
cpl_logfile = cpl_logs[0]
# find the last model.log and cpl.log
model_logfile = os.path.join(rundir, model + ".log." + lid)
if not os.path.isfile(model_logfile):
expect(False, "Model did not complete, no {} log file ".format(model_logfile))
elif os.stat(model_logfile).st_size == 0:
expect(False, "Run FAILED")
else:
count_ok = 0
for cpl_logfile in cpl_logs:
if not os.path.isfile(cpl_logfile):
break
with open(cpl_logfile, 'r') as fd:
if 'SUCCESSFUL TERMINATION' in fd.read():
count_ok += 1
if count_ok != cpl_ninst:
expect(False, "Model did not complete - see {} \n " .format(cpl_logfile))
###############################################################################
def _save_logs(case, lid):
###############################################################################
rundir = case.get_value("RUNDIR")
logfiles = glob.glob(os.path.join(rundir, "*.log.{}".format(lid)))
for logfile in logfiles:
if os.path.isfile(logfile):
gzip_existing_file(logfile)
######################################################################################
def _resubmit_check(case):
###############################################################################
# check to see if we need to do resubmission from this particular job,
# Note that Mira requires special logic
dout_s = case.get_value("DOUT_S")
logger.warning("dout_s {} ".format(dout_s))
mach = case.get_value("MACH")
logger.warning("mach {} ".format(mach))
resubmit_num = case.get_value("RESUBMIT")
logger.warning("resubmit_num {}".format(resubmit_num))
# If dout_s is True than short-term archiving handles the resubmit
# If dout_s is True and machine is mira submit the st_archive script
resubmit = False
if not dout_s and resubmit_num > 0:
resubmit = True
elif dout_s and mach == 'mira':
caseroot = case.get_value("CASEROOT")
cimeroot = case.get_value("CIMEROOT")
cmd = "ssh cooleylogin1 'cd {case}; CIMEROOT={root} ./case.submit {case} --job case.st_archive'".format(case=caseroot, root=cimeroot)
run_cmd(cmd, verbose=True)
if resubmit:
job = case.get_primary_job()
case.submit(job=job, resubmit=True)
logger.debug("resubmit after check is {}".format(resubmit))
###############################################################################
def _do_external(script_name, caseroot, rundir, lid, prefix):
###############################################################################
expect(os.path.isfile(script_name), "External script {} not found".format(script_name))
filename = "{}.external.log.{}".format(prefix, lid)
outfile = os.path.join(rundir, filename)
append_status("Starting script {}".format(script_name), "CaseStatus")
run_sub_or_cmd(script_name, [caseroot], (os.path.basename(script_name).split('.',1))[0], [caseroot], logfile=outfile)
append_status("Completed script {}".format(script_name), "CaseStatus")
###############################################################################
def _do_data_assimilation(da_script, caseroot, cycle, lid, rundir):
###############################################################################
expect(os.path.isfile(da_script), "Data Assimilation script {} not found".format(da_script))
filename = "da.log.{}".format(lid)
outfile = os.path.join(rundir, filename)
run_sub_or_cmd(da_script, [caseroot, cycle], os.path.basename(da_script), [caseroot, cycle], logfile=outfile)
###############################################################################
[docs]def case_run(self, skip_pnl=False, set_continue_run=False, submit_resubmits=False):
###############################################################################
# Set up the run, run the model, do the postrun steps
prerun_script = self.get_value("PRERUN_SCRIPT")
postrun_script = self.get_value("POSTRUN_SCRIPT")
data_assimilation_cycles = self.get_value("DATA_ASSIMILATION_CYCLES")
data_assimilation_script = self.get_value("DATA_ASSIMILATION_SCRIPT")
data_assimilation = (data_assimilation_cycles > 0 and
len(data_assimilation_script) > 0 and
os.path.isfile(data_assimilation_script))
# set up the LID
lid = new_lid()
if prerun_script:
self.flush()
_do_external(prerun_script, self.get_value("CASEROOT"), self.get_value("RUNDIR"),
lid, prefix="prerun")
self.read_xml()
for cycle in range(data_assimilation_cycles):
# After the first DA cycle, runs are restart runs
if cycle > 0:
lid = new_lid()
self.set_value("CONTINUE_RUN",
self.get_value("RESUBMIT_SETS_CONTINUE_RUN"))
lid = _run_model(self, lid, skip_pnl, da_cycle=cycle)
if self.get_value("CHECK_TIMING") or self.get_value("SAVE_TIMING"):
get_timing(self, lid) # Run the getTiming script
if data_assimilation:
self.flush()
_do_data_assimilation(data_assimilation_script, self.get_value("CASEROOT"), cycle, lid,
self.get_value("RUNDIR"))
self.read_xml()
_save_logs(self, lid)
save_postrun_provenance(self)
if postrun_script:
self.flush()
_do_external(postrun_script, self.get_value("CASEROOT"), self.get_value("RUNDIR"),
lid, prefix="postrun")
self.read_xml()
_save_logs(self, lid)
if set_continue_run:
self.set_value("CONTINUE_RUN",
self.get_value("RESUBMIT_SETS_CONTINUE_RUN"))
external_workflow = self.get_value("EXTERNAL_WORKFLOW")
if not external_workflow:
logger.warning("check for resubmit")
logger.debug("submit_resubmits is {}".format(submit_resubmits))
if submit_resubmits:
_resubmit_check(self)
return True