#!/usr/bin/env python3
"""
case.submit - Submit a cesm workflow to the queueing system or run it
if there is no queueing system. A cesm workflow may include multiple
jobs.
submit, check_case and check_da_settings are members of class Case in file case.py
"""
import configparser
from CIME.XML.standard_module_setup import *
from CIME.utils import expect, CIMEError, get_time_in_seconds
from CIME.status import run_and_log_case_status
from CIME.locked_files import (
unlock_file,
lock_file,
check_lockedfile,
check_lockedfiles,
)
from CIME.test_status import *
logger = logging.getLogger(__name__)
def _build_prereq_str(case, prev_job_ids):
delimiter = case.get_value("depend_separator")
prereq_str = ""
for job_id in prev_job_ids.values():
prereq_str += str(job_id) + delimiter
return prereq_str[:-1]
def _submit(
case,
job=None,
no_batch=False,
prereq=None,
allow_fail=False,
resubmit=False,
resubmit_immediate=False,
skip_pnl=False,
mail_user=None,
mail_type=None,
batch_args=None,
workflow=True,
chksum=False,
dryrun=False,
):
if job is None:
job = case.get_first_job()
caseroot = case.get_value("CASEROOT")
# Check mediator
hasMediator = True
comp_classes = case.get_values("COMP_CLASSES")
if "CPL" not in comp_classes:
hasMediator = False
# Check if CONTINUE_RUN value makes sense
# if submitted with a prereq don't do this check
if case.get_value("CONTINUE_RUN") and hasMediator and not prereq:
rundir = case.get_value("RUNDIR")
expect(
os.path.isdir(rundir),
"CONTINUE_RUN is true but RUNDIR {} does not exist".format(rundir),
)
# only checks for the first instance in a multidriver case
if case.get_value("COMP_INTERFACE") == "nuopc":
rpointer = "rpointer.cpl"
else:
rpointer = "rpointer.drv"
# Variable MULTI_DRIVER is always true for nuopc so we need to also check NINST > 1
if case.get_value("MULTI_DRIVER") and case.get_value("NINST") > 1:
rpointer = rpointer + "_0001"
expect(
os.path.exists(os.path.join(rundir, rpointer)),
"CONTINUE_RUN is true but this case does not appear to have restart files staged in {} {}".format(
rundir, rpointer
),
)
# Finally we open the rpointer file and check that it's correct
casename = case.get_value("CASE")
with open(os.path.join(rundir, rpointer), "r") as fd:
ncfile = fd.readline().strip()
expect(
ncfile.startswith(casename)
and os.path.exists(os.path.join(rundir, ncfile)),
"File {ncfile} not present or does not match case {casename}".format(
ncfile=os.path.join(rundir, ncfile), casename=casename
),
)
# if case.submit is called with the no_batch flag then we assume that this
# flag will stay in effect for the duration of the RESUBMITs
env_batch = case.get_env("batch")
external_workflow = case.get_value("EXTERNAL_WORKFLOW")
if env_batch.get_batch_system_type() == "none" or resubmit and external_workflow:
no_batch = True
if no_batch:
batch_system = "none"
else:
batch_system = env_batch.get_batch_system_type()
if batch_system != case.get_value("BATCH_SYSTEM"):
unlock_file(os.path.basename(env_batch.filename), caseroot)
case.set_value("BATCH_SYSTEM", batch_system)
env_batch_has_changed = False
if not external_workflow:
try:
check_lockedfile(case, os.path.basename(env_batch.filename))
except:
env_batch_has_changed = True
if batch_system != "none" and env_batch_has_changed and not external_workflow:
# May need to regen batch files if user made batch setting changes (e.g. walltime, queue, etc)
logger.warning(
"""
env_batch.xml appears to have changed, regenerating batch scripts
manual edits to these file will be lost!
"""
)
env_batch.make_all_batch_files(case)
case.flush()
lock_file(os.path.basename(env_batch.filename), caseroot)
if resubmit:
# This is a resubmission, do not reinitialize test values
if job == "case.test":
case.set_value("IS_FIRST_RUN", False)
resub = case.get_value("RESUBMIT")
logger.info("Submitting job '{}', resubmit={:d}".format(job, resub))
case.set_value("RESUBMIT", resub - 1)
if case.get_value("RESUBMIT_SETS_CONTINUE_RUN"):
case.set_value("CONTINUE_RUN", True)
else:
if job == "case.test":
case.set_value("IS_FIRST_RUN", True)
if no_batch:
batch_system = "none"
else:
batch_system = env_batch.get_batch_system_type()
case.set_value("BATCH_SYSTEM", batch_system)
env_batch_has_changed = False
try:
check_lockedfile(case, os.path.basename(env_batch.filename))
except CIMEError:
env_batch_has_changed = True
if env_batch.get_batch_system_type() != "none" and env_batch_has_changed:
# May need to regen batch files if user made batch setting changes (e.g. walltime, queue, etc)
logger.warning(
"""
env_batch.xml appears to have changed, regenerating batch scripts
manual edits to these file will be lost!
"""
)
env_batch.make_all_batch_files(case)
unlock_file(os.path.basename(env_batch.filename), caseroot)
lock_file(os.path.basename(env_batch.filename), caseroot)
case.check_case(skip_pnl=skip_pnl, chksum=chksum)
if job == case.get_primary_job():
case.check_DA_settings()
# Load Modules
case.load_env()
case.flush()
logger.warning("submit_jobs {}".format(job))
job_ids = case.submit_jobs(
no_batch=no_batch,
job=job,
prereq=prereq,
skip_pnl=skip_pnl,
resubmit_immediate=resubmit_immediate,
allow_fail=allow_fail,
mail_user=mail_user,
mail_type=mail_type,
batch_args=batch_args,
workflow=workflow,
dry_run=dryrun,
)
xml_jobids = []
if dryrun:
for job in job_ids:
xml_jobids.append("{}:{}".format(job[0], job[1]))
else:
for jobname, jobid in job_ids.items():
logger.info("Submitted job {} with id {}".format(jobname, jobid))
if jobid:
xml_jobids.append("{}:{}".format(jobname, jobid))
xml_jobid_text = ", ".join(xml_jobids)
if xml_jobid_text and not dryrun:
case.set_value("JOB_IDS", xml_jobid_text)
return xml_jobid_text
[docs]
def submit(
self,
job=None,
no_batch=False,
prereq=None,
allow_fail=False,
resubmit=False,
resubmit_immediate=False,
skip_pnl=False,
mail_user=None,
mail_type=None,
batch_args=None,
workflow=True,
chksum=False,
dryrun=False,
):
if resubmit_immediate and self.get_value("MACH") in ["mira", "cetus"]:
logger.warning(
"resubmit_immediate does not work on Mira/Cetus, submitting normally"
)
resubmit_immediate = False
caseroot = self.get_value("CASEROOT")
if self.get_value("TEST"):
casebaseid = self.get_value("CASEBASEID")
if os.path.exists(os.path.join(caseroot, "env_test.xml")):
self.set_initial_test_values()
# This should take care of the race condition where the submitted job
# begins immediately and tries to set RUN phase. We proactively assume
# a passed SUBMIT phase. If this state is already PASS, don't set it again
# because then we'll lose RUN phase info if it's there. This info is important
# for system_tests_common to know if it needs to reinitialize the test or not.
with TestStatus(test_dir=caseroot, test_name=casebaseid) as ts:
phase_status = ts.get_status(SUBMIT_PHASE)
if phase_status != TEST_PASS_STATUS:
ts.set_status(SUBMIT_PHASE, TEST_PASS_STATUS)
# If this is a resubmit check the hidden file .submit_options for
# any submit options used on the original submit and use them again
submit_options = os.path.join(caseroot, ".submit_options")
if resubmit and os.path.exists(submit_options):
config = configparser.RawConfigParser()
config.read(submit_options)
if not skip_pnl and config.has_option("SubmitOptions", "skip_pnl"):
skip_pnl = config.getboolean("SubmitOptions", "skip_pnl")
if mail_user is None and config.has_option("SubmitOptions", "mail_user"):
mail_user = config.get("SubmitOptions", "mail_user")
if mail_type is None and config.has_option("SubmitOptions", "mail_type"):
mail_type = str(config.get("SubmitOptions", "mail_type")).split(",")
if batch_args is None and config.has_option("SubmitOptions", "batch_args"):
batch_args = config.get("SubmitOptions", "batch_args")
is_batch = self.get_value("BATCH_SYSTEM") is not None
try:
functor = lambda: _submit(
self,
job=job,
no_batch=no_batch,
prereq=prereq,
allow_fail=allow_fail,
resubmit=resubmit,
resubmit_immediate=resubmit_immediate,
skip_pnl=skip_pnl,
mail_user=mail_user,
mail_type=mail_type,
batch_args=batch_args,
workflow=workflow,
chksum=chksum,
dryrun=dryrun,
)
run_and_log_case_status(
functor,
"case.submit",
caseroot=caseroot,
custom_success_msg_functor=lambda x: x.split(":")[-1],
is_batch=is_batch,
gitinterface=self._gitinterface,
)
except BaseException: # Want to catch KeyboardInterrupt too
# If something failed in the batch system, make sure to mark
# the test as failed if we are running a test.
if self.get_value("TEST"):
with TestStatus(test_dir=caseroot, test_name=casebaseid) as ts:
ts.set_status(SUBMIT_PHASE, TEST_FAIL_STATUS)
raise
[docs]
def check_case(self, skip_pnl=False, chksum=False):
check_lockedfiles(self)
if not skip_pnl:
self.create_namelists() # Must be called before check_all_input_data
logger.info("Checking that inputdata is available as part of case submission")
self.check_all_input_data(chksum=chksum)
if self.get_value("COMP_WAV") == "ww":
# the ww3 buildnml has dependencies on inputdata so we must run it again
self.create_namelists(component="WAV")
if self.get_value("COMP_INTERFACE") == "nuopc":
#
# Check that run length is a multiple of the longest component
# coupling interval. The longest interval is smallest NCPL value.
# models using the nuopc interface will fail at initialization unless
# ncpl follows these rules, other models will only fail later and so
# this test is skipped so that short tests can be run without adjusting NCPL
#
maxncpl = 10000
minncpl = 0
maxcomp = None
for comp in self.get_values("COMP_CLASSES"):
if comp == "CPL":
continue
compname = self.get_value("COMP_{}".format(comp))
# ignore stub components in this test.
if compname == "s{}".format(comp.lower()):
ncpl = None
else:
ncpl = self.get_value("{}_NCPL".format(comp))
if ncpl and maxncpl > ncpl:
maxncpl = ncpl
maxcomp = comp
if ncpl and minncpl < ncpl:
minncpl = ncpl
ncpl_base_period = self.get_value("NCPL_BASE_PERIOD")
if ncpl_base_period == "hour":
coupling_secs = 3600 / maxncpl
timestep = 3600 / minncpl
elif ncpl_base_period == "day":
coupling_secs = 86400 / maxncpl
timestep = 86400 / minncpl
elif ncpl_base_period == "year":
coupling_secs = 31536000 / maxncpl
timestep = 31536000 / minncpl
elif ncpl_base_period == "decade":
coupling_secs = 315360000 / maxncpl
timestep = 315360000 / minncpl
stop_option = self.get_value("STOP_OPTION")
stop_n = self.get_value("STOP_N")
if stop_option == "nsteps":
stop_option = "seconds"
stop_n = stop_n * timestep
runtime = get_time_in_seconds(stop_n, stop_option)
expect(
runtime >= coupling_secs and runtime % coupling_secs == 0,
" Runtime ({0} s) must be a multiple of the longest coupling interval {1}_NCPL ({2}s). Adjust runtime or {1}_NCPL".format(
runtime, maxcomp, coupling_secs
),
)
expect(
self.get_value("BUILD_COMPLETE"),
"Build complete is not True please rebuild the model by calling case.build",
)
logger.info("Check case OK")
[docs]
def check_DA_settings(self):
script = self.get_value("DATA_ASSIMILATION_SCRIPT")
cycles = self.get_value("DATA_ASSIMILATION_CYCLES")
if len(script) > 0 and os.path.isfile(script) and cycles > 0:
logger.info(
"Data Assimilation enabled using script {} with {:d} cycles".format(
script, cycles
)
)