"""
case_run is a member of Class Case
'"""
from CIME.XML.standard_module_setup import *
from CIME.utils import gzip_existing_file, new_lid, run_and_log_case_status
from CIME.utils import run_sub_or_cmd, append_status, safe_copy, model_log, CIMEError
from CIME.utils import get_model
from CIME.get_timing import get_timing
from CIME.provenance import save_prerun_provenance, save_postrun_provenance
import shutil, time, sys, os, glob
logger = logging.getLogger(__name__)
###############################################################################
def _pre_run_check(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
# Pre run initialization code..
if da_cycle > 0:
case.create_namelists(component='cpl')
return
caseroot = case.get_value("CASEROOT")
din_loc_root = case.get_value("DIN_LOC_ROOT")
rundir = case.get_value("RUNDIR")
if case.get_value("TESTCASE") == "PFS":
env_mach_pes = os.path.join(caseroot,"env_mach_pes.xml")
safe_copy(env_mach_pes,"{}.{}".format(env_mach_pes, lid))
# check for locked files, may impact BUILD_COMPLETE
skip = None
if case.get_value("EXTERNAL_WORKFLOW"):
skip = "env_batch"
case.check_lockedfiles(skip=skip)
logger.debug("check_lockedfiles OK")
build_complete = case.get_value("BUILD_COMPLETE")
# check that build is done
expect(build_complete,
"BUILD_COMPLETE is not true\nPlease rebuild the model interactively")
logger.debug("build complete is {} ".format(build_complete))
# load the module environment...
case.load_env(reset=True)
# create the timing directories, optionally cleaning them if needed.
if os.path.isdir(os.path.join(rundir, "timing")):
shutil.rmtree(os.path.join(rundir, "timing"))
os.makedirs(os.path.join(rundir, "timing", "checkpoints"))
# This needs to be done everytime the LID changes in order for log files to be set up correctly
# The following also needs to be called in case a user changes a user_nl_xxx file OR an env_run.xml
# variable while the job is in the queue
model_log("e3sm", logger, "{} NAMELIST CREATION BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
if skip_pnl:
case.create_namelists(component='cpl')
else:
logger.info("Generating namelists for {}".format(caseroot))
case.create_namelists()
model_log("e3sm", logger, "{} NAMELIST CREATION HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
logger.info("-------------------------------------------------------------------------")
logger.info(" - Prestage required restarts into {}".format(rundir))
logger.info(" - Case input data directory (DIN_LOC_ROOT) is {} ".format(din_loc_root))
logger.info(" - Checking for required input datasets in DIN_LOC_ROOT")
logger.info("-------------------------------------------------------------------------")
###############################################################################
def _run_model_impl(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
model_log("e3sm", logger, "{} PRE_RUN_CHECK BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
_pre_run_check(case, lid, skip_pnl=skip_pnl, da_cycle=da_cycle)
model_log("e3sm", logger, "{} PRE_RUN_CHECK HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
model = case.get_value("MODEL")
# Set OMP_NUM_THREADS
os.environ["OMP_NUM_THREADS"] = str(case.thread_count)
# Run the model
cmd = case.get_mpirun_cmd(allow_unresolved_envvars=False)
logger.info("run command is {} ".format(cmd))
rundir = case.get_value("RUNDIR")
loop = True
# MPIRUN_RETRY_REGEX allows the mpi command to be reattempted if the
# failure described by that regular expression is matched in the model log
# case.spare_nodes is overloaded and may also represent the number of
# retries to attempt if ALLOCATE_SPARE_NODES is False
retry_run_re = case.get_value("MPIRUN_RETRY_REGEX")
node_fail_re = case.get_value("NODE_FAIL_REGEX")
retry_count = 0
if retry_run_re:
retry_run_regex = re.compile(re.escape(retry_run_re))
retry_count = case.get_value("MPIRUN_RETRY_COUNT")
if node_fail_re:
node_fail_regex = re.compile(re.escape(node_fail_re))
while loop:
loop = False
model_log("e3sm", logger, "{} SAVE_PRERUN_PROVENANCE BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
save_prerun_provenance(case)
model_log("e3sm", logger, "{} SAVE_PRERUN_PROVENANCE HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
model_log("e3sm", logger, "{} MODEL EXECUTION BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
run_func = lambda: run_cmd_no_fail(cmd, from_dir=rundir)
case.flush()
try:
run_and_log_case_status(run_func, "model execution", caseroot=case.get_value("CASEROOT"))
cmd_success = True
except CIMEError:
cmd_success = False
# The run will potentially take a very long time. We need to
# allow the user to xmlchange things in their case.
#
# WARNING: All case variables are reloaded after this call to get the
# new values of any variables that may have been changed by
# the user during model execution. Thus, any local variables
# set from case variables before this point may be
# inconsistent with their latest values in the xml files, so
# should generally be reloaded (via case.get_value(XXX)) if they are still needed.
case.read_xml()
model_log("e3sm", logger, "{} MODEL EXECUTION HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
model_logfile = os.path.join(rundir, model + ".log." + lid)
# Determine if failure was due to a failed node, if so, try to restart
if retry_run_re or node_fail_re:
model_logfile = os.path.join(rundir, model + ".log." + lid)
if os.path.exists(model_logfile):
num_node_fails=0
num_retry_fails=0
if node_fail_re:
num_node_fails = len(node_fail_regex.findall(open(model_logfile, 'r').read()))
if retry_run_re:
num_retry_fails = len(retry_run_regex.findall(open(model_logfile, 'r').read()))
logger.debug ("RETRY: num_retry_fails {} spare_nodes {} retry_count {}".
format(num_retry_fails, case.spare_nodes, retry_count))
if num_node_fails > 0 and case.spare_nodes >= num_node_fails:
# We failed due to node failure!
logger.warning("Detected model run failed due to node failure, restarting")
case.spare_nodes -= num_node_fails
loop = True
case.set_value("CONTINUE_RUN",
case.get_value("RESUBMIT_SETS_CONTINUE_RUN"))
elif num_retry_fails > 0 and retry_count >= num_retry_fails:
logger.warning("Detected model run failed, restarting")
retry_count -= 1
loop = True
if loop:
# Archive the last consistent set of restart files and restore them
if case.get_value("DOUT_S"):
case.case_st_archive(resubmit=False)
case.restore_from_archive()
lid = new_lid()
case.create_namelists()
if not cmd_success and not loop:
# We failed and we're not restarting
expect(False, "RUN FAIL: Command '{}' failed\nSee log file for details: {}".format(cmd, model_logfile))
model_log("e3sm", logger, "{} POST_RUN_CHECK BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
_post_run_check(case, lid)
model_log("e3sm", logger, "{} POST_RUN_CHECK HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
return lid
###############################################################################
def _run_model(case, lid, skip_pnl=False, da_cycle=0):
###############################################################################
functor = lambda: _run_model_impl(case, lid, skip_pnl=skip_pnl, da_cycle=da_cycle)
return run_and_log_case_status(functor, "case.run", caseroot=case.get_value("CASEROOT"))
###############################################################################
def _post_run_check(case, lid):
###############################################################################
rundir = case.get_value("RUNDIR")
model = case.get_value("MODEL")
driver = case.get_value("COMP_INTERFACE")
model = get_model()
fv3_standalone = False
if "CPL" not in case.get_values("COMP_CLASSES"):
fv3_standalone = True
if driver == 'nuopc':
if fv3_standalone:
file_prefix = model
else:
file_prefix = 'med'
else:
file_prefix = 'cpl'
cpl_ninst = 1
if case.get_value("MULTI_DRIVER"):
cpl_ninst = case.get_value("NINST_MAX")
cpl_logs = []
if cpl_ninst > 1:
for inst in range(cpl_ninst):
cpl_logs.append(os.path.join(rundir, file_prefix + "_%04d.log." % (inst+1) + lid))
else:
cpl_logs = [os.path.join(rundir, file_prefix + ".log." + lid)]
cpl_logfile = cpl_logs[0]
# find the last model.log and cpl.log
model_logfile = os.path.join(rundir, model + ".log." + lid)
if not os.path.isfile(model_logfile):
expect(False, "Model did not complete, no {} log file ".format(model_logfile))
elif os.stat(model_logfile).st_size == 0:
expect(False, "Run FAILED")
else:
count_ok = 0
for cpl_logfile in cpl_logs:
if not os.path.isfile(cpl_logfile):
break
with open(cpl_logfile, 'r') as fd:
if fv3_standalone and 'HAS ENDED' in fd.read():
count_ok += 1
elif not fv3_standalone and 'SUCCESSFUL TERMINATION' in fd.read():
count_ok += 1
if count_ok != cpl_ninst:
expect(False, "Model did not complete - see {} \n " .format(cpl_logfile))
###############################################################################
def _save_logs(case, lid):
###############################################################################
rundir = case.get_value("RUNDIR")
logfiles = glob.glob(os.path.join(rundir, "*.log.{}".format(lid)))
for logfile in logfiles:
if os.path.isfile(logfile):
gzip_existing_file(logfile)
######################################################################################
def _resubmit_check(case):
###############################################################################
"""
check to see if we need to do resubmission from this particular job,
Note that Mira requires special logic
"""
dout_s = case.get_value("DOUT_S")
logger.warning("dout_s {} ".format(dout_s))
mach = case.get_value("MACH")
logger.warning("mach {} ".format(mach))
resubmit_num = case.get_value("RESUBMIT")
logger.warning("resubmit_num {}".format(resubmit_num))
# If dout_s is True than short-term archiving handles the resubmit
# If dout_s is True and machine is mira submit the st_archive script
resubmit = False
if not dout_s and resubmit_num > 0:
resubmit = True
elif dout_s and mach == 'mira':
caseroot = case.get_value("CASEROOT")
cimeroot = case.get_value("CIMEROOT")
cmd = "ssh cooleylogin1 'cd {case}; CIMEROOT={root} ./case.submit {case} --job case.st_archive'".format(case=caseroot, root=cimeroot)
run_cmd(cmd, verbose=True)
if resubmit:
job = case.get_primary_job()
case.submit(job=job, resubmit=True)
logger.debug("resubmit after check is {}".format(resubmit))
###############################################################################
def _do_external(script_name, caseroot, rundir, lid, prefix):
###############################################################################
expect(os.path.isfile(script_name), "External script {} not found".format(script_name))
filename = "{}.external.log.{}".format(prefix, lid)
outfile = os.path.join(rundir, filename)
append_status("Starting script {}".format(script_name), "CaseStatus")
run_sub_or_cmd(script_name, [caseroot], (os.path.basename(script_name).split('.',1))[0], [caseroot], logfile=outfile) # For sub, use case?
append_status("Completed script {}".format(script_name), "CaseStatus")
###############################################################################
def _do_data_assimilation(da_script, caseroot, cycle, lid, rundir):
###############################################################################
expect(os.path.isfile(da_script), "Data Assimilation script {} not found".format(da_script))
filename = "da.log.{}".format(lid)
outfile = os.path.join(rundir, filename)
run_sub_or_cmd(da_script, [caseroot, cycle], os.path.basename(da_script), [caseroot, cycle], logfile=outfile) # For sub, use case?
###############################################################################
[docs]def case_run(self, skip_pnl=False, set_continue_run=False, submit_resubmits=False):
###############################################################################
model_log("e3sm", logger, "{} CASE.RUN BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
# Set up the run, run the model, do the postrun steps
# set up the LID
lid = new_lid()
prerun_script = self.get_value("PRERUN_SCRIPT")
if prerun_script:
model_log("e3sm", logger, "{} PRERUN_SCRIPT BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
self.flush()
_do_external(prerun_script, self.get_value("CASEROOT"), self.get_value("RUNDIR"),
lid, prefix="prerun")
self.read_xml()
model_log("e3sm", logger, "{} PRERUN_SCRIPT HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
# We might need to tweak these if we want to allow the user to change them
data_assimilation_cycles = self.get_value("DATA_ASSIMILATION_CYCLES")
data_assimilation_script = self.get_value("DATA_ASSIMILATION_SCRIPT")
data_assimilation = (data_assimilation_cycles > 0 and
len(data_assimilation_script) > 0 and
os.path.isfile(data_assimilation_script))
for cycle in range(data_assimilation_cycles):
# After the first DA cycle, runs are restart runs
if cycle > 0:
lid = new_lid()
self.set_value("CONTINUE_RUN",
self.get_value("RESUBMIT_SETS_CONTINUE_RUN"))
# WARNING: All case variables are reloaded during run_model to get
# new values of any variables that may have been changed by
# the user during model execution. Thus, any local variables
# set from case variables before this point may be
# inconsistent with their latest values in the xml files, so
# should generally be reloaded (via case.get_value(XXX)) if they are still needed.
model_log("e3sm", logger, "{} RUN_MODEL BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
lid = _run_model(self, lid, skip_pnl, da_cycle=cycle)
model_log("e3sm", logger, "{} RUN_MODEL HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
if self.get_value("CHECK_TIMING") or self.get_value("SAVE_TIMING"):
model_log("e3sm", logger, "{} GET_TIMING BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
get_timing(self, lid) # Run the getTiming script
model_log("e3sm", logger, "{} GET_TIMING HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
if data_assimilation:
model_log("e3sm", logger, "{} DO_DATA_ASSIMILATION BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
self.flush()
_do_data_assimilation(data_assimilation_script, self.get_value("CASEROOT"), cycle, lid,
self.get_value("RUNDIR"))
self.read_xml()
model_log("e3sm", logger, "{} DO_DATA_ASSIMILATION HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
_save_logs(self, lid) # Copy log files back to caseroot
model_log("e3sm", logger, "{} SAVE_POSTRUN_PROVENANCE BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
save_postrun_provenance(self)
model_log("e3sm", logger, "{} SAVE_POSTRUN_PROVENANCE HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
postrun_script = self.get_value("POSTRUN_SCRIPT")
if postrun_script:
model_log("e3sm", logger, "{} POSTRUN_SCRIPT BEGINS HERE".format(time.strftime("%Y-%m-%d %H:%M:%S")))
self.flush()
_do_external(postrun_script, self.get_value("CASEROOT"), self.get_value("RUNDIR"),
lid, prefix="postrun")
self.read_xml()
_save_logs(self, lid)
model_log("e3sm", logger, "{} POSTRUN_SCRIPT HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
if set_continue_run:
self.set_value("CONTINUE_RUN",
self.get_value("RESUBMIT_SETS_CONTINUE_RUN"))
external_workflow = self.get_value("EXTERNAL_WORKFLOW")
if not external_workflow:
logger.warning("check for resubmit")
logger.debug("submit_resubmits is {}".format(submit_resubmits))
if submit_resubmits:
_resubmit_check(self)
model_log("e3sm", logger, "{} CASE.RUN HAS FINISHED".format(time.strftime("%Y-%m-%d %H:%M:%S")))
return True