"""
Base class for CIME system tests
"""
from CIME.XML.standard_module_setup import *
from CIME.XML.env_run import EnvRun
from CIME.utils import append_testlog, get_model, safe_copy, get_timestamp, CIMEError
from CIME.test_status import *
from CIME.hist_utils import copy_histfiles, compare_test, generate_teststatus, \
compare_baseline, get_ts_synopsis, generate_baseline
from CIME.provenance import save_test_time, get_test_success
from CIME.locked_files import LOCKED_DIR, lock_file, is_locked
import CIME.build as build
import glob, gzip, time, traceback, six, shutil
logger = logging.getLogger(__name__)
[docs]class SystemTestsCommon(object):
def __init__(self, case, expected=None):
"""
initialize a CIME system test object, if the locked env_run.orig.xml
does not exist copy the current env_run.xml file. If it does exist restore values
changed in a previous run of the test.
"""
self._case = case
caseroot = case.get_value("CASEROOT")
self._caseroot = caseroot
self._orig_caseroot = caseroot
self._runstatus = None
self._casebaseid = self._case.get_value("CASEBASEID")
self._test_status = TestStatus(test_dir=caseroot, test_name=self._casebaseid)
self._init_environment(caseroot)
self._init_locked_files(caseroot, expected)
self._skip_pnl = False
self._cpllog = "med" if self._case.get_value("COMP_INTERFACE")=="nuopc" else "cpl"
self._old_build = False
self._ninja = False
self._dry_run = False
def _init_environment(self, caseroot):
"""
Do initializations of environment variables that are needed in __init__
"""
# Needed for sh scripts
os.environ["CASEROOT"] = caseroot
def _init_locked_files(self, caseroot, expected):
"""
If the locked env_run.orig.xml does not exist, copy the current
env_run.xml file. If it does exist, restore values changed in a previous
run of the test.
"""
if is_locked("env_run.orig.xml"):
self.compare_env_run(expected=expected)
elif os.path.isfile(os.path.join(caseroot, "env_run.xml")):
lock_file("env_run.xml", caseroot=caseroot, newname="env_run.orig.xml")
def _resetup_case(self, phase, reset=False):
"""
Re-setup this case. This is necessary if user is re-running an already-run
phase.
"""
# We never want to re-setup if we're doing the resubmitted run
phase_status = self._test_status.get_status(phase)
if reset or (self._case.get_value("IS_FIRST_RUN") and phase_status != TEST_PEND_STATUS):
logging.warning("Resetting case due to detected re-run of phase {}".format(phase))
self._case.set_initial_test_values()
self._case.case_setup(reset=True, test_mode=True)
[docs] def build(self, sharedlib_only=False, model_only=False, old_build=False, ninja=False, dry_run=False):
"""
Do NOT override this method, this method is the framework that
controls the build phase. build_phase is the extension point
that subclasses should use.
"""
success = True
self._old_build = old_build
self._ninja = ninja
self._dry_run = dry_run
for phase_name, phase_bool in [(SHAREDLIB_BUILD_PHASE, not model_only),
(MODEL_BUILD_PHASE, not sharedlib_only)]:
if phase_bool:
self._resetup_case(phase_name)
with self._test_status:
self._test_status.set_status(phase_name, TEST_PEND_STATUS)
start_time = time.time()
try:
self.build_phase(sharedlib_only=(phase_name==SHAREDLIB_BUILD_PHASE),
model_only=(phase_name==MODEL_BUILD_PHASE))
except BaseException as e: # We want KeyboardInterrupts to generate FAIL status
success = False
if isinstance(e, CIMEError):
# Don't want to print stacktrace for a build failure since that
# is not a CIME/infrastructure problem.
excmsg = str(e)
else:
excmsg = "Exception during build:\n{}\n{}".format(str(e), traceback.format_exc())
append_testlog(excmsg, self._orig_caseroot)
raise
finally:
time_taken = time.time() - start_time
with self._test_status:
self._test_status.set_status(phase_name, TEST_PASS_STATUS if success else TEST_FAIL_STATUS, comments=("time={:d}".format(int(time_taken))))
return success
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
"""
This is the default build phase implementation, it just does an individual build.
This is the subclass' extension point if they need to define a custom build
phase.
PLEASE THROW EXCEPTION ON FAIL
"""
self.build_indv(sharedlib_only=sharedlib_only, model_only=model_only)
[docs] def build_indv(self, sharedlib_only=False, model_only=False):
"""
Perform an individual build
"""
model = self._case.get_value('MODEL')
build.case_build(self._caseroot, case=self._case,
sharedlib_only=sharedlib_only, model_only=model_only,
save_build_provenance=not model=='cesm',
use_old=self._old_build, ninja=self._ninja, dry_run=self._dry_run)
[docs] def clean_build(self, comps=None):
if comps is None:
comps = [x.lower() for x in self._case.get_values("COMP_CLASSES")]
build.clean(self._case, cleanlist=comps)
[docs] def run(self, skip_pnl=False):
"""
Do NOT override this method, this method is the framework that controls
the run phase. run_phase is the extension point that subclasses should use.
"""
success = True
start_time = time.time()
self._skip_pnl = skip_pnl
try:
self._resetup_case(RUN_PHASE)
do_baseline_ops = True
with self._test_status:
self._test_status.set_status(RUN_PHASE, TEST_PEND_STATUS)
# We do not want to do multiple repetitions of baseline operations for
# multi-submit tests. We just want to do them upon the final submission.
# Other submissions will need to mark those phases as PEND to ensure wait_for_tests
# waits for them.
if self._case.get_value("BATCH_SYSTEM") != "none":
do_baseline_ops = self._case.get_value("RESUBMIT") == 0
self.run_phase()
if self._case.get_value("GENERATE_BASELINE"):
if do_baseline_ops:
self._phase_modifying_call(GENERATE_PHASE, self._generate_baseline)
else:
with self._test_status:
self._test_status.set_status(GENERATE_PHASE, TEST_PEND_STATUS)
if self._case.get_value("COMPARE_BASELINE"):
if do_baseline_ops:
self._phase_modifying_call(BASELINE_PHASE, self._compare_baseline)
self._phase_modifying_call(MEMCOMP_PHASE, self._compare_memory)
self._phase_modifying_call(THROUGHPUT_PHASE, self._compare_throughput)
else:
with self._test_status:
self._test_status.set_status(BASELINE_PHASE, TEST_PEND_STATUS)
self._test_status.set_status(MEMCOMP_PHASE, TEST_PEND_STATUS)
self._test_status.set_status(THROUGHPUT_PHASE, TEST_PEND_STATUS)
self._phase_modifying_call(MEMLEAK_PHASE, self._check_for_memleak)
self._phase_modifying_call(STARCHIVE_PHASE, self._st_archive_case_test)
except BaseException as e: # We want KeyboardInterrupts to generate FAIL status
success = False
if isinstance(e, CIMEError):
# Don't want to print stacktrace for a model failure since that
# is not a CIME/infrastructure problem.
excmsg = str(e)
else:
excmsg = "Exception during run:\n{}\n{}".format(str(e), traceback.format_exc())
append_testlog(excmsg, self._orig_caseroot)
raise
finally:
# Writing the run status should be the very last thing due to wait_for_tests
time_taken = time.time() - start_time
status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS
with self._test_status:
self._test_status.set_status(RUN_PHASE, status, comments=("time={:d}".format(int(time_taken))))
if get_model() == "e3sm":
# If run phase worked, remember the time it took in order to improve later walltime ests
baseline_root = self._case.get_value("BASELINE_ROOT")
if success:
save_test_time(baseline_root, self._casebaseid, time_taken)
# If overall things did not pass, offer the user some insight into what might have broken things
overall_status = self._test_status.get_overall_test_status(ignore_namelists=True)
if overall_status != TEST_PASS_STATUS:
srcroot = self._case.get_value("SRCROOT")
worked_before, last_pass, last_fail_transition = \
get_test_success(baseline_root, srcroot, self._casebaseid)
if worked_before:
if last_pass is not None:
# commits between last_pass and now broke things
stat, out, err = run_cmd("git rev-list --first-parent {}..{}".format(last_pass, "HEAD"), from_dir=srcroot)
if stat == 0:
append_testlog("NEW FAIL: Potentially broken merges:\n{}".format(out), self._orig_caseroot)
else:
logger.warning("Unable to list potentially broken merges: {}\n{}".format(out, err))
else:
if last_pass is not None and last_fail_transition is not None:
# commits between last_pass and last_fail_transition broke things
stat, out, err = run_cmd("git rev-list --first-parent {}..{}".format(last_pass, last_fail_transition), from_dir=srcroot)
if stat == 0:
append_testlog("OLD FAIL: Potentially broken merges:\n{}".format(out), self._orig_caseroot)
else:
logger.warning("Unable to list potentially broken merges: {}\n{}".format(out, err))
if get_model() == "cesm" and self._case.get_value("GENERATE_BASELINE"):
baseline_dir = os.path.join(self._case.get_value("BASELINE_ROOT"), self._case.get_value("BASEGEN_CASE"))
generate_teststatus(self._caseroot, baseline_dir)
# We return success if the run phase worked; memleaks, diffs will NOT be taken into account
# with this return value.
return success
[docs] def run_phase(self):
"""
This is the default run phase implementation, it just does an individual run.
This is the subclass' extension point if they need to define a custom run phase.
PLEASE THROW AN EXCEPTION ON FAIL
"""
self.run_indv()
def _get_caseroot(self):
"""
Returns the current CASEROOT value
"""
return self._caseroot
def _set_active_case(self, case):
"""
Use for tests that have multiple cases
"""
self._case = case
self._case.load_env(reset=True)
self._caseroot = case.get_value("CASEROOT")
[docs] def run_indv(self, suffix="base", st_archive=False):
"""
Perform an individual run. Raises an EXCEPTION on fail.
"""
stop_n = self._case.get_value("STOP_N")
stop_option = self._case.get_value("STOP_OPTION")
run_type = self._case.get_value("RUN_TYPE")
rundir = self._case.get_value("RUNDIR")
is_batch = self._case.get_value("BATCH_SYSTEM") != "none"
# remove any cprnc output leftover from previous runs
for compout in glob.iglob(os.path.join(rundir,"*.cprnc.out")):
os.remove(compout)
infostr = "doing an {:d} {} {} test".format(stop_n, stop_option, run_type)
rest_option = self._case.get_value("REST_OPTION")
if rest_option == "none" or rest_option == "never":
infostr += ", no restarts written"
else:
rest_n = self._case.get_value("REST_N")
infostr += ", with restarts every {:d} {}".format(rest_n, rest_option)
logger.info(infostr)
self._case.case_run(skip_pnl=self._skip_pnl, submit_resubmits=is_batch)
if not self._coupler_log_indicates_run_complete():
expect(False, "Coupler did not indicate run passed")
if suffix is not None:
self._component_compare_copy(suffix)
if st_archive:
self._case.case_st_archive(resubmit=True)
def _coupler_log_indicates_run_complete(self):
newestcpllogfiles = self._get_latest_cpl_logs()
logger.debug("Latest Coupler log file(s) {}" .format(newestcpllogfiles))
# Exception is raised if the file is not compressed
allgood = len(newestcpllogfiles)
for cpllog in newestcpllogfiles:
try:
if six.b("SUCCESSFUL TERMINATION") in gzip.open(cpllog, 'rb').read():
allgood = allgood - 1
except Exception as e: # Probably want to be more specific here
msg = e.__str__()
logger.info("{} is not compressed, assuming run failed {}".format(cpllog, msg))
return allgood==0
def _component_compare_copy(self, suffix):
comments = copy_histfiles(self._case, suffix)
append_testlog(comments, self._orig_caseroot)
def _component_compare_test(self, suffix1, suffix2,
success_change=False,
ignore_fieldlist_diffs=False):
"""
Return value is not generally checked, but is provided in case a custom
run case needs indirection based on success.
If success_change is True, success requires some files to be different.
If ignore_fieldlist_diffs is True, then: If the two cases differ only in their
field lists (i.e., all shared fields are bit-for-bit, but one case has some
diagnostic fields that are missing from the other case), treat the two cases
as identical.
"""
success, comments = self._do_compare_test(suffix1, suffix2,
ignore_fieldlist_diffs=ignore_fieldlist_diffs)
if success_change:
success = not success
append_testlog(comments, self._orig_caseroot)
status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS
with self._test_status:
self._test_status.set_status("{}_{}_{}".format(COMPARE_PHASE, suffix1, suffix2), status)
return success
def _do_compare_test(self, suffix1, suffix2, ignore_fieldlist_diffs=False):
"""
Wraps the call to compare_test to facilitate replacement in unit
tests
"""
return compare_test(self._case, suffix1, suffix2,
ignore_fieldlist_diffs=ignore_fieldlist_diffs)
def _st_archive_case_test(self):
result = self._case.test_env_archive()
with self._test_status:
if result:
self._test_status.set_status(STARCHIVE_PHASE, TEST_PASS_STATUS)
else:
self._test_status.set_status(STARCHIVE_PHASE, TEST_FAIL_STATUS)
def _get_mem_usage(self, cpllog):
"""
Examine memory usage as recorded in the cpl log file and look for unexpected
increases.
"""
memlist = []
meminfo = re.compile(r".*model date =\s+(\w+).*memory =\s+(\d+\.?\d+).*highwater")
if cpllog is not None and os.path.isfile(cpllog):
if '.gz' == cpllog[-3:]:
fopen = gzip.open
else:
fopen = open
with fopen(cpllog, "rb") as f:
for line in f:
m = meminfo.match(line.decode('utf-8'))
if m:
memlist.append((float(m.group(1)), float(m.group(2))))
# Remove the last mem record, it's sometimes artificially high
if len(memlist) > 0:
memlist.pop()
return memlist
def _get_throughput(self, cpllog):
"""
Examine memory usage as recorded in the cpl log file and look for unexpected
increases.
"""
if cpllog is not None and os.path.isfile(cpllog):
with gzip.open(cpllog, "rb") as f:
cpltext = f.read().decode('utf-8')
m = re.search(r"# simulated years / cmp-day =\s+(\d+\.\d+)\s",cpltext)
if m:
return float(m.group(1))
return None
def _phase_modifying_call(self, phase, function):
"""
Ensures that unexpected exceptions from phases will result in a FAIL result
in the TestStatus file for that phase.
"""
try:
function()
except Exception as e: # Do NOT want to catch KeyboardInterrupt
msg = e.__str__()
excmsg = "Exception during {}:\n{}\n{}".format(phase, msg, traceback.format_exc())
logger.warning(excmsg)
append_testlog(excmsg, self._orig_caseroot)
with self._test_status:
self._test_status.set_status(phase, TEST_FAIL_STATUS, comments="exception")
def _check_for_memleak(self):
"""
Examine memory usage as recorded in the cpl log file and look for unexpected
increases.
"""
with self._test_status:
latestcpllogs = self._get_latest_cpl_logs()
for cpllog in latestcpllogs:
memlist = self._get_mem_usage(cpllog)
if len(memlist)<3:
self._test_status.set_status(MEMLEAK_PHASE, TEST_PASS_STATUS, comments="insuffiencient data for memleak test")
else:
finaldate = int(memlist[-1][0])
originaldate = int(memlist[0][0])
finalmem = float(memlist[-1][1])
originalmem = float(memlist[0][1])
memdiff = -1
if originalmem > 0:
memdiff = (finalmem - originalmem)/originalmem
tolerance = self._case.get_value("TEST_MEMLEAK_TOLERANCE")
if tolerance is None:
tolerance = 0.1
expect(tolerance > 0.0, "Bad value for memleak tolerance in test")
if memdiff < 0:
self._test_status.set_status(MEMLEAK_PHASE, TEST_PASS_STATUS, comments="insuffiencient data for memleak test")
elif memdiff < tolerance:
self._test_status.set_status(MEMLEAK_PHASE, TEST_PASS_STATUS)
else:
comment = "memleak detected, memory went from {:f} to {:f} in {:d} days".format(originalmem, finalmem, finaldate-originaldate)
append_testlog(comment, self._orig_caseroot)
self._test_status.set_status(MEMLEAK_PHASE, TEST_FAIL_STATUS, comments=comment)
[docs] def compare_env_run(self, expected=None):
"""
Compare env_run file to original and warn about differences
"""
components = self._case.get_values("COMP_CLASSES")
f1obj = self._case.get_env("run")
f2obj = EnvRun(self._caseroot, os.path.join(LOCKED_DIR, "env_run.orig.xml"), components=components)
diffs = f1obj.compare_xml(f2obj)
for key in diffs.keys():
if expected is not None and key in expected:
logging.warning(" Resetting {} for test".format(key))
f1obj.set_value(key, f2obj.get_value(key, resolved=False))
else:
print("WARNING: Found difference in test {}: case: {} original value {}".format(key, diffs[key][0], diffs[key][1]))
return False
return True
def _get_latest_cpl_logs(self):
"""
find and return the latest cpl log file in the run directory
"""
coupler_log_path = self._case.get_value("RUNDIR")
cpllogs = glob.glob(os.path.join(coupler_log_path, '{}*.log.*'.format(self._cpllog)))
lastcpllogs = []
if cpllogs:
lastcpllogs.append(max(cpllogs, key=os.path.getctime))
basename = os.path.basename(lastcpllogs[0])
suffix = basename.split('.',1)[1]
for log in cpllogs:
if log in lastcpllogs:
continue
if log.endswith(suffix):
lastcpllogs.append(log)
return lastcpllogs
def _compare_memory(self):
with self._test_status:
# compare memory usage to baseline
baseline_name = self._case.get_value("BASECMP_CASE")
basecmp_dir = os.path.join(self._case.get_value("BASELINE_ROOT"), baseline_name)
newestcpllogfiles = self._get_latest_cpl_logs()
if len(newestcpllogfiles) > 0:
memlist = self._get_mem_usage(newestcpllogfiles[0])
for cpllog in newestcpllogfiles:
m = re.search(r"/({}.*.log).*.gz".format(self._cpllog),cpllog)
if m is not None:
baselog = os.path.join(basecmp_dir, m.group(1))+".gz"
if baselog is None or not os.path.isfile(baselog):
# for backward compatibility
baselog = os.path.join(basecmp_dir, self._cpllog+".log")
if os.path.isfile(baselog) and len(memlist) > 3:
blmem = self._get_mem_usage(baselog)
blmem = 0 if blmem == [] else blmem[-1][1]
curmem = memlist[-1][1]
diff = 0.0 if blmem == 0 else (curmem-blmem)/blmem
if diff < 0.1 and self._test_status.get_status(MEMCOMP_PHASE) is None:
self._test_status.set_status(MEMCOMP_PHASE, TEST_PASS_STATUS)
elif self._test_status.get_status(MEMCOMP_PHASE) != TEST_FAIL_STATUS:
comment = "Error: Memory usage increase > 10% from baseline"
self._test_status.set_status(MEMCOMP_PHASE, TEST_FAIL_STATUS, comments=comment)
append_testlog(comment, self._orig_caseroot)
def _compare_throughput(self):
with self._test_status:
# compare memory usage to baseline
baseline_name = self._case.get_value("BASECMP_CASE")
basecmp_dir = os.path.join(self._case.get_value("BASELINE_ROOT"), baseline_name)
newestcpllogfiles = self._get_latest_cpl_logs()
for cpllog in newestcpllogfiles:
m = re.search(r"/({}.*.log).*.gz".format(self._cpllog), cpllog)
if m is not None:
baselog = os.path.join(basecmp_dir, m.group(1))+".gz"
if baselog is None or not os.path.isfile(baselog):
# for backward compatibility
baselog = os.path.join(basecmp_dir, self._cpllog)
if os.path.isfile(baselog):
# compare throughput to baseline
current = self._get_throughput(cpllog)
baseline = self._get_throughput(baselog)
#comparing ypd so bigger is better
if baseline is not None and current is not None:
diff = (baseline - current)/baseline
tolerance = self._case.get_value("TEST_TPUT_TOLERANCE")
if tolerance is None:
tolerance = 0.1
expect(tolerance > 0.0, "Bad value for throughput tolerance in test")
if diff < tolerance and self._test_status.get_status(THROUGHPUT_PHASE) is None:
self._test_status.set_status(THROUGHPUT_PHASE, TEST_PASS_STATUS)
elif self._test_status.get_status(THROUGHPUT_PHASE) != TEST_FAIL_STATUS:
comment = "Error: Computation time increase > {:d} pct from baseline".format(int(tolerance*100))
self._test_status.set_status(THROUGHPUT_PHASE, TEST_FAIL_STATUS, comments=comment)
append_testlog(comment, self._orig_caseroot)
def _compare_baseline(self):
"""
compare the current test output to a baseline result
"""
with self._test_status:
# compare baseline
success, comments = compare_baseline(self._case)
append_testlog(comments, self._orig_caseroot)
status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS
baseline_name = self._case.get_value("BASECMP_CASE")
ts_comments = os.path.dirname(baseline_name) + ": " + get_ts_synopsis(comments)
self._test_status.set_status(BASELINE_PHASE, status, comments=ts_comments)
def _generate_baseline(self):
"""
generate a new baseline case based on the current test
"""
with self._test_status:
# generate baseline
success, comments = generate_baseline(self._case)
append_testlog(comments, self._orig_caseroot)
status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS
baseline_name = self._case.get_value("BASEGEN_CASE")
self._test_status.set_status(GENERATE_PHASE, status, comments=os.path.dirname(baseline_name))
basegen_dir = os.path.join(self._case.get_value("BASELINE_ROOT"), self._case.get_value("BASEGEN_CASE"))
# copy latest cpl log to baseline
# drop the date so that the name is generic
newestcpllogfiles = self._get_latest_cpl_logs()
for cpllog in newestcpllogfiles:
m = re.search(r"/({}.*.log).*.gz".format(self._cpllog),cpllog)
if m is not None:
baselog = os.path.join(basegen_dir, m.group(1))+".gz"
safe_copy(cpllog,
os.path.join(basegen_dir,baselog))
[docs]class FakeTest(SystemTestsCommon):
"""
Inheriters of the FakeTest Class are intended to test the code.
All members of the FakeTest Class must
have names beginnig with "TEST" this is so that the find_system_test
in utils.py will work with these classes.
"""
def _set_script(self, script, requires_exe=False):
self._script = script # pylint: disable=attribute-defined-outside-init
self._requires_exe = requires_exe # pylint: disable=attribute-defined-outside-init
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
if self._requires_exe:
super(FakeTest, self).build_phase(sharedlib_only=sharedlib_only, model_only=model_only)
if not sharedlib_only:
exeroot = self._case.get_value("EXEROOT")
cime_model = self._case.get_value("MODEL")
modelexe = os.path.join(exeroot, "{}.exe".format(cime_model))
real_modelexe = modelexe + ".real"
if self._requires_exe:
shutil.move(modelexe, real_modelexe)
with open(modelexe, 'w') as f:
f.write("#!/bin/bash\n")
f.write(self._script)
os.chmod(modelexe, 0o755)
if not self._requires_exe:
build.post_build(self._case, [], build_complete=True)
[docs] def run_indv(self, suffix="base", st_archive=False):
mpilib = self._case.get_value("MPILIB")
# This flag is needed by mpt to run a script under mpiexec
if mpilib == "mpt":
os.environ["MPI_SHEPHERD"] = "true"
super(FakeTest, self).run_indv(suffix, st_archive)
[docs]class TESTRUNPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
echo Insta pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTRUNDIFF(FakeTest):
"""
You can generate a diff with this test as follows:
1) Run the test and generate a baseline
2) set TESTRUNDIFF_ALTERNATE environment variable to TRUE
3) Re-run the same test from step 1 but do a baseline comparison instead of generation
3.a) This should give you a DIFF
"""
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
echo Insta pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
if [ -z "$TESTRUNDIFF_ALTERNATE" ]; then
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
else
cp {root}/scripts/tests/cpl.hi2.nc.test {rundir}/{case}.cpl.hi.0.nc
fi
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTRUNDIFFRESUBMIT(TESTRUNDIFF):
pass
[docs]class TESTTESTDIFF(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
echo Insta pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
cp {root}/scripts/tests/cpl.hi2.nc.test {rundir}/{case}.cpl.hi.0.nc.rest
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
super(TESTTESTDIFF, self).build_phase(sharedlib_only=sharedlib_only,
model_only=model_only)
[docs] def run_phase(self):
super(TESTTESTDIFF, self).run_phase()
self._component_compare_test("base", "rest")
[docs]class TESTRUNFAIL(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
if [ -z "$TESTRUNFAIL_PASS" ]; then
echo Insta fail
echo model failed > {rundir}/{log}.log.$LID
exit -1
else
echo Insta pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
fi
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTRUNFAILEXC(TESTRUNPASS):
[docs] def run_phase(self):
raise RuntimeError("Exception from run_phase")
[docs]class TESTRUNSTARCFAIL(TESTRUNPASS):
def _st_archive_case_test(self):
raise RuntimeError("Exception from st archive")
[docs]class TESTBUILDFAIL(TESTRUNPASS):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
if "TESTBUILDFAIL_PASS" in os.environ:
TESTRUNPASS.build_phase(self, sharedlib_only, model_only)
else:
if (not sharedlib_only):
blddir = self._case.get_value("EXEROOT")
bldlog = os.path.join(blddir, "{}.bldlog.{}".format(get_model(), get_timestamp("%y%m%d-%H%M%S")))
with open(bldlog, "w") as fd:
fd.write("BUILD FAIL: Intentional fail for testing infrastructure")
expect(False, "BUILD FAIL: Intentional fail for testing infrastructure")
[docs]class TESTBUILDFAILEXC(FakeTest):
def __init__(self, case):
FakeTest.__init__(self, case)
raise RuntimeError("Exception from init")
[docs]class TESTRUNUSERXMLCHANGE(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
exeroot = self._case.get_value("EXEROOT")
caseroot = self._case.get_value("CASEROOT")
cime_model = self._case.get_value("MODEL")
modelexe = os.path.join(exeroot, "{}.exe".format(cime_model))
new_stop_n = self._case.get_value("STOP_N") * 2
script = \
"""
cd {caseroot}
./xmlchange DOUT_S=TRUE
./xmlchange DOUT_S=TRUE --file env_test.xml
./xmlchange RESUBMIT=1
./xmlchange STOP_N={stopn}
./xmlchange CONTINUE_RUN=FALSE
./xmlchange RESUBMIT_SETS_CONTINUE_RUN=FALSE
cd -
sleep 5
{modelexe}.real "$@"
sleep 5
# Need to remove self in order to avoid infinite loop
ls $(dirname {modelexe})
mv {modelexe} {modelexe}.old
mv {modelexe}.real {modelexe}
ls $(dirname {modelexe})
sleep 5
""".format(caseroot=caseroot, modelexe=modelexe, stopn=str(new_stop_n))
self._set_script(script, requires_exe=True)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs] def run_phase(self):
self.run_indv(st_archive=True)
[docs]class TESTRUNSLOWPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
sleep 300
echo Slow pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTMEMLEAKFAIL(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
testfile = os.path.join(cimeroot,"scripts","tests","cpl.log.failmemleak.gz")
script = \
"""
echo Insta pass
gunzip -c {testfile} > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
""".format(testfile=testfile, rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTMEMLEAKPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
testfile = os.path.join(cimeroot,"scripts","tests","cpl.log.passmemleak.gz")
script = \
"""
echo Insta pass
gunzip -c {testfile} > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
""".format(testfile=testfile, rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)