"""
Base class for CIME system tests
"""
from CIME.XML.standard_module_setup import *
from CIME.XML.env_run import EnvRun
from CIME.utils import append_testlog, get_model, safe_copy
from CIME.test_status import *
from CIME.hist_utils import *
from CIME.provenance import save_test_time
from CIME.locked_files import LOCKED_DIR, lock_file, is_locked
import CIME.build as build
import glob, gzip, time, traceback, six
logger = logging.getLogger(__name__)
[docs]class SystemTestsCommon(object):
def __init__(self, case, expected=None):
"""
initialize a CIME system test object, if the locked env_run.orig.xml
does not exist copy the current env_run.xml file. If it does exist restore values
changed in a previous run of the test.
"""
self._case = case
caseroot = case.get_value("CASEROOT")
self._caseroot = caseroot
self._orig_caseroot = caseroot
self._runstatus = None
self._casebaseid = self._case.get_value("CASEBASEID")
self._test_status = TestStatus(test_dir=caseroot, test_name=self._casebaseid)
self._init_environment(caseroot)
self._init_locked_files(caseroot, expected)
self._skip_pnl = False
self._cpllog = 'cpl'
def _init_environment(self, caseroot):
"""
Do initializations of environment variables that are needed in __init__
"""
# Needed for sh scripts
os.environ["CASEROOT"] = caseroot
def _init_locked_files(self, caseroot, expected):
"""
If the locked env_run.orig.xml does not exist, copy the current
env_run.xml file. If it does exist, restore values changed in a previous
run of the test.
"""
if is_locked("env_run.orig.xml"):
self.compare_env_run(expected=expected)
elif os.path.isfile(os.path.join(caseroot, "env_run.xml")):
lock_file("env_run.xml", caseroot=caseroot, newname="env_run.orig.xml")
def _resetup_case(self, phase, reset=False):
"""
Re-setup this case. This is necessary if user is re-running an already-run
phase.
"""
# We never want to re-setup if we're doing the resubmitted run
phase_status = self._test_status.get_status(phase)
if reset or (self._case.get_value("IS_FIRST_RUN") and phase_status != TEST_PEND_STATUS):
logging.warning("Resetting case due to detected re-run of phase {}".format(phase))
self._case.set_initial_test_values()
self._case.case_setup(reset=True, test_mode=True)
[docs] def build(self, sharedlib_only=False, model_only=False):
"""
Do NOT override this method, this method is the framework that
controls the build phase. build_phase is the extension point
that subclasses should use.
"""
success = True
for phase_name, phase_bool in [(SHAREDLIB_BUILD_PHASE, not model_only),
(MODEL_BUILD_PHASE, not sharedlib_only)]:
if phase_bool:
self._resetup_case(phase_name)
with self._test_status:
self._test_status.set_status(phase_name, TEST_PEND_STATUS)
start_time = time.time()
try:
self.build_phase(sharedlib_only=(phase_name==SHAREDLIB_BUILD_PHASE),
model_only=(phase_name==MODEL_BUILD_PHASE))
except BaseException as e:
success = False
msg = e.__str__()
if "FAILED, cat" in msg or "BUILD FAIL" in msg:
# Don't want to print stacktrace for a build failure since that
# is not a CIME/infrastructure problem.
excmsg = msg
else:
excmsg = "Exception during build:\n{}\n{}".format(msg, traceback.format_exc())
logger.warning(excmsg)
append_testlog(excmsg)
time_taken = time.time() - start_time
with self._test_status:
self._test_status.set_status(phase_name, TEST_PASS_STATUS if success else TEST_FAIL_STATUS, comments=("time={:d}".format(int(time_taken))))
if not success:
break
return success
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
"""
This is the default build phase implementation, it just does an individual build.
This is the subclass' extension point if they need to define a custom build
phase.
PLEASE THROW EXCEPTION ON FAIL
"""
self.build_indv(sharedlib_only=sharedlib_only, model_only=model_only)
[docs] def build_indv(self, sharedlib_only=False, model_only=False):
"""
Perform an individual build
"""
model = self._case.get_value('MODEL')
build.case_build(self._caseroot, case=self._case,
sharedlib_only=sharedlib_only, model_only=model_only,
save_build_provenance=not model=='cesm')
[docs] def clean_build(self, comps=None):
if comps is None:
comps = [x.lower() for x in self._case.get_values("COMP_CLASSES")]
build.clean(self._case, cleanlist=comps)
[docs] def run(self, skip_pnl=False):
"""
Do NOT override this method, this method is the framework that controls
the run phase. run_phase is the extension point that subclasses should use.
"""
success = True
start_time = time.time()
self._skip_pnl = skip_pnl
try:
self._resetup_case(RUN_PHASE)
with self._test_status:
self._test_status.set_status(RUN_PHASE, TEST_PEND_STATUS)
self.run_phase()
if self._case.get_value("GENERATE_BASELINE"):
self._generate_baseline()
if self._case.get_value("COMPARE_BASELINE"):
self._compare_baseline()
self._check_for_memleak()
self._st_archive_case_test()
except BaseException as e:
success = False
msg = e.__str__()
if "RUN FAIL" in msg:
# Don't want to print stacktrace for a model failure since that
# is not a CIME/infrastructure problem.
excmsg = msg
else:
excmsg = "Exception during run:\n{}\n{}".format(msg, traceback.format_exc())
logger.warning(excmsg)
append_testlog(excmsg)
# Writing the run status should be the very last thing due to wait_for_tests
time_taken = time.time() - start_time
status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS
with self._test_status:
self._test_status.set_status(RUN_PHASE, status, comments=("time={:d}".format(int(time_taken))))
if success and get_model() == "e3sm":
save_test_time(self._case.get_value("BASELINE_ROOT"), self._casebaseid, time_taken)
# We return success if the run phase worked; memleaks, diffs will not be taken into account
# with this return value.
return success
[docs] def run_phase(self):
"""
This is the default run phase implementation, it just does an individual run.
This is the subclass' extension point if they need to define a custom run phase.
PLEASE THROW AN EXCEPTION ON FAIL
"""
self.run_indv()
def _get_caseroot(self):
"""
Returns the current CASEROOT value
"""
return self._caseroot
def _set_active_case(self, case):
"""
Use for tests that have multiple cases
"""
self._case = case
self._case.load_env(reset=True)
self._caseroot = case.get_value("CASEROOT")
[docs] def run_indv(self, suffix="base", st_archive=False):
"""
Perform an individual run. Raises an EXCEPTION on fail.
"""
stop_n = self._case.get_value("STOP_N")
stop_option = self._case.get_value("STOP_OPTION")
run_type = self._case.get_value("RUN_TYPE")
rundir = self._case.get_value("RUNDIR")
# remove any cprnc output leftover from previous runs
for compout in glob.iglob(os.path.join(rundir,"*.cprnc.out")):
os.remove(compout)
infostr = "doing an {:d} {} {} test".format(stop_n, stop_option, run_type)
rest_option = self._case.get_value("REST_OPTION")
if rest_option == "none" or rest_option == "never":
infostr += ", no restarts written"
else:
rest_n = self._case.get_value("REST_N")
infostr += ", with restarts every {:d} {}".format(rest_n, rest_option)
logger.info(infostr)
self._case.case_run(skip_pnl=self._skip_pnl, submit_resubmits=True)
if not self._coupler_log_indicates_run_complete():
expect(False, "Coupler did not indicate run passed")
if suffix is not None:
self._component_compare_copy(suffix)
if st_archive:
self._case.case_st_archive(resubmit=True)
def _coupler_log_indicates_run_complete(self):
newestcpllogfiles = self._get_latest_cpl_logs()
logger.debug("Latest Coupler log file(s) {}" .format(newestcpllogfiles))
# Exception is raised if the file is not compressed
allgood = len(newestcpllogfiles)
for cpllog in newestcpllogfiles:
try:
if six.b("SUCCESSFUL TERMINATION") in gzip.open(cpllog, 'rb').read():
allgood = allgood - 1
except BaseException as e:
msg = e.__str__()
logger.info("{} is not compressed, assuming run failed {}".format(cpllog, msg))
return allgood==0
def _component_compare_copy(self, suffix):
comments = copy(self._case, suffix)
append_testlog(comments)
def _component_compare_test(self, suffix1, suffix2, success_change=False):
"""
Return value is not generally checked, but is provided in case a custom
run case needs indirection based on success.
If success_change is True, success requires some files to be different
"""
success, comments = self._do_compare_test(suffix1, suffix2)
if success_change:
success = not success
append_testlog(comments)
status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS
with self._test_status:
self._test_status.set_status("{}_{}_{}".format(COMPARE_PHASE, suffix1, suffix2), status)
return success
def _do_compare_test(self, suffix1, suffix2):
"""
Wraps the call to compare_test to facilitate replacement in unit
tests
"""
return compare_test(self._case, suffix1, suffix2)
def _st_archive_case_test(self):
result = self._case.test_env_archive()
with self._test_status:
if result:
self._test_status.set_status(STARCHIVE_PHASE, TEST_PASS_STATUS)
else:
self._test_status.set_status(STARCHIVE_PHASE, TEST_FAIL_STATUS)
def _get_mem_usage(self, cpllog):
"""
Examine memory usage as recorded in the cpl log file and look for unexpected
increases.
"""
memlist = []
meminfo = re.compile(r".*model date =\s+(\w+).*memory =\s+(\d+\.?\d+).*highwater")
if cpllog is not None and os.path.isfile(cpllog):
if '.gz' == cpllog[-3:]:
fopen = gzip.open
else:
fopen = open
with fopen(cpllog, "rb") as f:
for line in f:
m = meminfo.match(line.decode('utf-8'))
if m:
memlist.append((float(m.group(1)), float(m.group(2))))
# Remove the last mem record, it's sometimes artificially high
if len(memlist) > 0:
memlist.pop()
return memlist
def _get_throughput(self, cpllog):
"""
Examine memory usage as recorded in the cpl log file and look for unexpected
increases.
"""
if cpllog is not None and os.path.isfile(cpllog):
with gzip.open(cpllog, "rb") as f:
cpltext = f.read().decode('utf-8')
m = re.search(r"# simulated years / cmp-day =\s+(\d+\.\d+)\s",cpltext)
if m:
return float(m.group(1))
return None
def _check_for_memleak(self):
"""
Examine memory usage as recorded in the cpl log file and look for unexpected
increases.
"""
latestcpllogs = self._get_latest_cpl_logs()
for cpllog in latestcpllogs:
memlist = self._get_mem_usage(cpllog)
with self._test_status:
if len(memlist)<3:
self._test_status.set_status(MEMLEAK_PHASE, TEST_PASS_STATUS, comments="insuffiencient data for memleak test")
else:
finaldate = int(memlist[-1][0])
originaldate = int(memlist[0][0])
finalmem = float(memlist[-1][1])
originalmem = float(memlist[0][1])
memdiff = -1
if originalmem > 0:
memdiff = (finalmem - originalmem)/originalmem
tolerance = self._case.get_value("TEST_MEMLEAK_TOLERANCE")
if tolerance is None:
tolerance = 0.1
expect(tolerance > 0.0, "Bad value for memleak tolerance in test")
if memdiff < 0:
self._test_status.set_status(MEMLEAK_PHASE, TEST_PASS_STATUS, comments="insuffiencient data for memleak test")
elif memdiff < tolerance:
self._test_status.set_status(MEMLEAK_PHASE, TEST_PASS_STATUS)
else:
comment = "memleak detected, memory went from {:f} to {:f} in {:d} days".format(originalmem, finalmem, finaldate-originaldate)
append_testlog(comment)
self._test_status.set_status(MEMLEAK_PHASE, TEST_FAIL_STATUS, comments=comment)
[docs] def compare_env_run(self, expected=None):
"""
Compare env_run file to original and warn about differences
"""
components = self._case.get_values("COMP_CLASSES")
f1obj = self._case.get_env("run")
f2obj = EnvRun(self._caseroot, os.path.join(LOCKED_DIR, "env_run.orig.xml"), components=components)
diffs = f1obj.compare_xml(f2obj)
for key in diffs.keys():
if expected is not None and key in expected:
logging.warning(" Resetting {} for test".format(key))
f1obj.set_value(key, f2obj.get_value(key, resolved=False))
else:
print("WARNING: Found difference in test {}: case: {} original value {}".format(key, diffs[key][0], diffs[key][1]))
return False
return True
def _get_latest_cpl_logs(self):
"""
find and return the latest cpl log file in the run directory
"""
coupler_log_path = self._case.get_value("RUNDIR")
cpllogs = glob.glob(os.path.join(coupler_log_path, 'cpl*.log.*'))
lastcpllogs = []
if cpllogs:
lastcpllogs.append(max(cpllogs, key=os.path.getctime))
basename = os.path.basename(lastcpllogs[0])
suffix = basename.split('.',1)[1]
for log in cpllogs:
if log in lastcpllogs:
continue
if log.endswith(suffix):
lastcpllogs.append(log)
return lastcpllogs
def _compare_baseline(self):
"""
compare the current test output to a baseline result
"""
with self._test_status:
# compare baseline
success, comments = compare_baseline(self._case)
append_testlog(comments)
status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS
baseline_name = self._case.get_value("BASECMP_CASE")
ts_comments = (os.path.dirname(baseline_name) + ": " + comments) if "\n" not in comments else os.path.dirname(baseline_name)
self._test_status.set_status(BASELINE_PHASE, status, comments=ts_comments)
basecmp_dir = os.path.join(self._case.get_value("BASELINE_ROOT"), baseline_name)
# compare memory usage to baseline
newestcpllogfiles = self._get_latest_cpl_logs()
if len(newestcpllogfiles) > 0:
memlist = self._get_mem_usage(newestcpllogfiles[0])
for cpllog in newestcpllogfiles:
m = re.search(r"/(cpl.*.log).*.gz",cpllog)
if m is not None:
baselog = os.path.join(basecmp_dir, m.group(1))+".gz"
if baselog is None or not os.path.isfile(baselog):
# for backward compatibility
baselog = os.path.join(basecmp_dir, "cpl.log")
if os.path.isfile(baselog) and len(memlist) > 3:
blmem = self._get_mem_usage(baselog)
blmem = 0 if blmem == [] else blmem[-1][1]
curmem = memlist[-1][1]
diff = (curmem-blmem)/blmem
if diff < 0.1 and self._test_status.get_status(MEMCOMP_PHASE) is None:
self._test_status.set_status(MEMCOMP_PHASE, TEST_PASS_STATUS)
elif self._test_status.get_status(MEMCOMP_PHASE) != TEST_FAIL_STATUS:
comment = "Error: Memory usage increase > 10% from baseline"
self._test_status.set_status(MEMCOMP_PHASE, TEST_FAIL_STATUS, comments=comment)
append_testlog(comment)
# compare throughput to baseline
current = self._get_throughput(cpllog)
baseline = self._get_throughput(baselog)
#comparing ypd so bigger is better
if baseline is not None and current is not None:
diff = (baseline - current)/baseline
tolerance = self._case.get_value("TEST_TPUT_TOLERANCE")
if tolerance is None:
tolerance = 0.25
expect(tolerance > 0.0, "Bad value for throughput tolerance in test")
if diff < tolerance and self._test_status.get_status(THROUGHPUT_PHASE) is None:
self._test_status.set_status(THROUGHPUT_PHASE, TEST_PASS_STATUS)
elif self._test_status.get_status(THROUGHPUT_PHASE) != TEST_FAIL_STATUS:
comment = "Error: Computation time increase > {:d} pct from baseline".format(int(tolerance*100))
self._test_status.set_status(THROUGHPUT_PHASE, TEST_FAIL_STATUS, comments=comment)
append_testlog(comment)
def _generate_baseline(self):
"""
generate a new baseline case based on the current test
"""
with self._test_status:
# generate baseline
success, comments = generate_baseline(self._case)
append_testlog(comments)
status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS
baseline_name = self._case.get_value("BASEGEN_CASE")
self._test_status.set_status("{}".format(GENERATE_PHASE), status, comments=os.path.dirname(baseline_name))
basegen_dir = os.path.join(self._case.get_value("BASELINE_ROOT"), self._case.get_value("BASEGEN_CASE"))
# copy latest cpl log to baseline
# drop the date so that the name is generic
newestcpllogfiles = self._get_latest_cpl_logs()
for cpllog in newestcpllogfiles:
m = re.search(r"/(cpl.*.log).*.gz",cpllog)
if m is not None:
baselog = os.path.join(basegen_dir, m.group(1))+".gz"
safe_copy(cpllog,
os.path.join(basegen_dir,baselog))
[docs]class FakeTest(SystemTestsCommon):
"""
Inheriters of the FakeTest Class are intended to test the code.
All members of the FakeTest Class must
have names beginnig with "TEST" this is so that the find_system_test
in utils.py will work with these classes.
"""
def _set_script(self, script):
self._script = script # pylint: disable=attribute-defined-outside-init
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
if (not sharedlib_only):
exeroot = self._case.get_value("EXEROOT")
cime_model = self._case.get_value("MODEL")
modelexe = os.path.join(exeroot, "{}.exe".format(cime_model))
with open(modelexe, 'w') as f:
f.write("#!/bin/bash\n")
f.write(self._script)
os.chmod(modelexe, 0o755)
build.post_build(self._case, [], build_complete=True)
[docs] def run_indv(self, suffix='base', st_archive=False):
mpilib = self._case.get_value("MPILIB")
# This flag is needed by mpt to run a script under mpiexec
if mpilib == "mpt":
os.environ["MPI_SHEPHERD"] = "true"
super(FakeTest, self).run_indv(suffix, st_archive)
[docs]class TESTRUNPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
echo Insta pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTRUNDIFF(FakeTest):
"""
You can generate a diff with this test as follows:
1) Run the test and generate a baseline
2) set TESTRUNDIFF_ALTERNATE environment variable to TRUE
3) Re-run the same test from step 1 but do a baseline comparison instead of generation
3.a) This should give you a DIFF
"""
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
echo Insta pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
if [ -z "$TESTRUNDIFF_ALTERNATE" ]; then
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
else
cp {root}/scripts/tests/cpl.hi2.nc.test {rundir}/{case}.cpl.hi.0.nc
fi
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTTESTDIFF(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
echo Insta pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
cp {root}/scripts/tests/cpl.hi2.nc.test {rundir}/{case}.cpl.hi.0.nc.rest
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
super(TESTTESTDIFF, self).build_phase(sharedlib_only=sharedlib_only,
model_only=model_only)
[docs] def run_phase(self):
super(TESTTESTDIFF, self).run_phase()
self._component_compare_test("base", "rest")
[docs]class TESTRUNFAIL(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
if [ -z "$TESTRUNFAIL_PASS" ]; then
echo Insta fail
echo model failed > {rundir}/{log}.log.$LID
exit -1
else
echo Insta pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
fi
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTRUNFAILEXC(TESTRUNPASS):
[docs] def run_phase(self):
raise RuntimeError("Exception from run_phase")
[docs]class TESTBUILDFAIL(TESTRUNPASS):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
if "TESTBUILDFAIL_PASS" in os.environ:
TESTRUNPASS.build_phase(self, sharedlib_only, model_only)
else:
if (not sharedlib_only):
expect(False, "BUILD FAIL: Intentional fail for testing infrastructure")
[docs]class TESTBUILDFAILEXC(FakeTest):
def __init__(self, case):
FakeTest.__init__(self, case)
raise RuntimeError("Exception from init")
[docs]class TESTRUNSLOWPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
script = \
"""
sleep 300
echo Slow pass
echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
""".format(rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTMEMLEAKFAIL(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
testfile = os.path.join(cimeroot,"scripts","tests","cpl.log.failmemleak.gz")
script = \
"""
echo Insta pass
gunzip -c {testfile} > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
""".format(testfile=testfile, rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)
[docs]class TESTMEMLEAKPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False):
rundir = self._case.get_value("RUNDIR")
cimeroot = self._case.get_value("CIMEROOT")
case = self._case.get_value("CASE")
testfile = os.path.join(cimeroot,"scripts","tests","cpl.log.passmemleak.gz")
script = \
"""
echo Insta pass
gunzip -c {testfile} > {rundir}/{log}.log.$LID
cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc
""".format(testfile=testfile, rundir=rundir, log=self._cpllog, root=cimeroot, case=case)
self._set_script(script)
FakeTest.build_phase(self,
sharedlib_only=sharedlib_only, model_only=model_only)