Source code for CIME.SystemTests.system_tests_common

"""
Base class for CIME system tests
"""

from CIME.XML.standard_module_setup import *
from CIME.XML.env_run import EnvRun
from CIME.XML.env_test import EnvTest
from CIME.status import append_testlog
from CIME.utils import (
    get_model,
    safe_copy,
    get_timestamp,
    CIMEError,
    expect,
    get_current_commit,
    SharedArea,
    is_comp_standalone,
)
from CIME.test_status import *
from CIME.hist_utils import (
    copy_histfiles,
    compare_test,
    generate_teststatus,
    compare_baseline,
    get_ts_synopsis,
    generate_baseline,
)
from CIME.config import Config
from CIME.provenance import save_test_time, get_test_success
from CIME.locked_files import LOCKED_DIR, lock_file, is_locked
from CIME.baselines.performance import (
    get_latest_cpl_logs,
    perf_get_memory_list,
    perf_compare_memory_baseline,
    perf_compare_throughput_baseline,
    perf_write_baseline,
    load_coupler_customization,
)
import CIME.build as build

import glob, gzip, time, traceback, os, math
from contextlib import ExitStack

logger = logging.getLogger(__name__)

# Name of directory under the run directory in which init-generated files are placed
INIT_GENERATED_FILES_DIRNAME = "init_generated_files"


[docs] def fix_single_exe_case(case): """Fixes cases created with --single-exe. When tests are created using --single-exe, the test_scheduler will set `BUILD_COMPLETE` to True, but some tests require calls to `case.case_setup` which can resets `BUILD_COMPLETE` to false. This function will check if a case was created with `--single-exe` and ensure `BUILD_COMPLETE` is True. Returns: True when case required modification otherwise False. """ if is_single_exe_case(case): with ExitStack() as stack: # enter context if case is still read-only, entering the context # multiple times can cause side effects for later calls to # `set_value` when it's assumed the cause is writeable. if case._read_only_mode: stack.enter_context(case) case.set_value("BUILD_COMPLETE", True) return True return False
[docs] def is_single_exe_case(case): """Determines if the case was created with the --single-exe option. If `CASEROOT` is not part of `EXEROOT` and the `TEST` variable is True, then its safe to assume the case was created with `./create_test` and the `--single-exe` option. Returns: True when the case was created with `--single-exe` otherwise false. """ caseroot = case.get_value("CASEROOT") exeroot = case.get_value("EXEROOT") test = case.get_value("TEST") return caseroot not in exeroot and test
[docs] class SystemTestsCommon(object): def __init__( self, case, expected=None, **kwargs ): # pylint: disable=unused-argument """ initialize a CIME system test object, if the locked env_run.orig.xml does not exist copy the current env_run.xml file. If it does exist restore values changed in a previous run of the test. """ self._case = case caseroot = case.get_value("CASEROOT") self._caseroot = caseroot self._orig_caseroot = caseroot self._runstatus = None self._casebaseid = self._case.get_value("CASEBASEID") self._test_status = TestStatus(test_dir=caseroot, test_name=self._casebaseid) self._init_environment(caseroot) self._init_locked_files(caseroot, expected) self._skip_pnl = False self._cpllog = ( "med" if self._case.get_value("COMP_INTERFACE") == "nuopc" else "cpl" ) self._ninja = False self._dry_run = False self._user_separate_builds = False self._expected_num_cmp = None self._rest_n = None def _set_restart_interval(self): stop_n = self._case.get_value("STOP_N") stop_option = self._case.get_value("STOP_OPTION") self._case.set_value("REST_OPTION", stop_option) # We need to make sure the run is long enough and to set REST_N to a # value that makes sense for all components maxncpl = 10000 minncpl = 0 for comp in self._case.get_values("COMP_CLASSES"): if comp == "CPL": continue compname = self._case.get_value("COMP_{}".format(comp)) # ignore stub components in this test. if compname == "s{}".format(comp.lower()): ncpl = None else: ncpl = self._case.get_value("{}_NCPL".format(comp)) if ncpl and maxncpl > ncpl: maxncpl = ncpl if ncpl and minncpl < ncpl: minncpl = ncpl ncpl_base_period = self._case.get_value("NCPL_BASE_PERIOD") if ncpl_base_period == "hour": coupling_secs = 3600 / maxncpl timestep = 3600 / minncpl elif ncpl_base_period == "day": coupling_secs = 86400 / maxncpl timestep = 86400 / minncpl elif ncpl_base_period == "year": coupling_secs = 31536000 / maxncpl timestep = 31536000 / minncpl elif ncpl_base_period == "decade": coupling_secs = 315360000 / maxncpl timestep = 315360000 / minncpl # Convert stop_n to units of coupling intervals factor = 1 if stop_option == "nsteps": factor = timestep elif stop_option == "nminutes": factor = 60 elif stop_option == "nhours": factor = 3600 elif stop_option == "ndays": factor = 86400 elif stop_option == "nyears": factor = 315360000 else: expect(False, f"stop_option {stop_option} not available for this test") stop_n = int(stop_n * factor // coupling_secs) rest_n = math.ceil((stop_n // 2 + 1) * coupling_secs / factor) expect(stop_n > 0, "Bad STOP_N: {:d}".format(stop_n)) expect(stop_n > 2, "ERROR: stop_n value {:d} too short".format(stop_n)) logger.info( "doing an {0} {1} initial test with restart file at {2} {1}".format( str(stop_n), stop_option, str(rest_n) ) ) self._case.set_value("REST_N", rest_n) return rest_n def _init_environment(self, caseroot): """ Do initializations of environment variables that are needed in __init__ """ # Needed for sh scripts os.environ["CASEROOT"] = caseroot def _init_locked_files(self, caseroot, expected): """ If the locked env_run.orig.xml does not exist, copy the current env_run.xml file. If it does exist, restore values changed in a previous run of the test. """ if is_locked("env_run.orig.xml", caseroot): self.compare_env_run(expected=expected) elif os.path.isfile(os.path.join(caseroot, "env_run.xml")): lock_file("env_run.xml", caseroot, newname="env_run.orig.xml") def _resetup_case(self, phase, reset=False): """ Re-setup this case. This is necessary if user is re-running an already-run phase. """ # We never want to re-setup if we're doing the resubmitted run phase_status = self._test_status.get_status(phase) phase_comment = self._test_status.get_comment(phase) rerunning = ( phase_status != TEST_PEND_STATUS or phase_comment == TEST_RERUN_COMMENT ) if reset or (self._case.get_value("IS_FIRST_RUN") and rerunning): logging.warning( "Resetting case due to detected re-run of phase {}".format(phase) ) self._case.set_initial_test_values() self._case.case_setup(reset=True, test_mode=True) fix_single_exe_case(self._case)
[docs] def build( self, sharedlib_only=False, model_only=False, ninja=False, dry_run=False, separate_builds=False, skip_submit=False, ): """ Do NOT override this method, this method is the framework that controls the build phase. build_phase is the extension point that subclasses should use. """ success = True self._ninja = ninja self._dry_run = dry_run self._user_separate_builds = separate_builds was_run_pend = self._test_status.current_is(RUN_PHASE, TEST_PEND_STATUS) for phase_name, phase_bool in [ (SHAREDLIB_BUILD_PHASE, not model_only), (MODEL_BUILD_PHASE, not sharedlib_only), ]: if phase_bool: self._resetup_case(phase_name) with self._test_status: self._test_status.set_status(phase_name, TEST_PEND_STATUS) start_time = time.time() try: self.build_phase( sharedlib_only=(phase_name == SHAREDLIB_BUILD_PHASE), model_only=(phase_name == MODEL_BUILD_PHASE), ) except BaseException as e: # We want KeyboardInterrupts to generate FAIL status success = False if isinstance(e, CIMEError): # Don't want to print stacktrace for a build failure since that # is not a CIME/infrastructure problem. excmsg = str(e) else: excmsg = "Exception during build:\n{}\n{}".format( str(e), traceback.format_exc() ) append_testlog(excmsg, self._orig_caseroot) raise finally: time_taken = time.time() - start_time with self._test_status: self._test_status.set_status( phase_name, TEST_PASS_STATUS if success else TEST_FAIL_STATUS, comments=("time={:d}".format(int(time_taken))), ) # Building model while job is queued and awaiting run if ( skip_submit and was_run_pend and self._test_status.current_is(SUBMIT_PHASE, TEST_PEND_STATUS) ): with self._test_status: self._test_status.set_status(SUBMIT_PHASE, TEST_PASS_STATUS) return success
[docs] def build_phase(self, sharedlib_only=False, model_only=False): """ This is the default build phase implementation, it just does an individual build. This is the subclass' extension point if they need to define a custom build phase. PLEASE THROW EXCEPTION ON FAIL """ self.build_indv(sharedlib_only=sharedlib_only, model_only=model_only)
[docs] def build_indv(self, sharedlib_only=False, model_only=False): """ Perform an individual build """ model = self._case.get_value("MODEL") build.case_build( self._caseroot, case=self._case, sharedlib_only=sharedlib_only, model_only=model_only, save_build_provenance=not model == "cesm", ninja=self._ninja, dry_run=self._dry_run, separate_builds=self._user_separate_builds, ) logger.info("build_indv complete")
[docs] def clean_build(self, comps=None): if comps is None: comps = [x.lower() for x in self._case.get_values("COMP_CLASSES")] build.clean(self._case, cleanlist=comps)
[docs] def run(self, skip_pnl=False): """ Do NOT override this method, this method is the framework that controls the run phase. run_phase is the extension point that subclasses should use. """ success = True start_time = time.time() self._skip_pnl = skip_pnl try: self._resetup_case(RUN_PHASE) do_baseline_ops = True with self._test_status: self._test_status.set_status(RUN_PHASE, TEST_PEND_STATUS) # We do not want to do multiple repetitions of baseline operations for # multi-submit tests. We just want to do them upon the final submission. # Other submissions will need to mark those phases as PEND to ensure wait_for_tests # waits for them. if self._case.get_value("BATCH_SYSTEM") != "none": do_baseline_ops = self._case.get_value("RESUBMIT") == 0 self.run_phase() if self._case.get_value("GENERATE_BASELINE"): if do_baseline_ops: self._phase_modifying_call(GENERATE_PHASE, self._generate_baseline) else: with self._test_status: self._test_status.set_status(GENERATE_PHASE, TEST_PEND_STATUS) if self._case.get_value("COMPARE_BASELINE"): if do_baseline_ops: self._phase_modifying_call(BASELINE_PHASE, self._compare_baseline) comp_standalone, _ = is_comp_standalone(self._case) if not comp_standalone: self._phase_modifying_call(MEMCOMP_PHASE, self._compare_memory) self._phase_modifying_call( THROUGHPUT_PHASE, self._compare_throughput ) else: with self._test_status: self._test_status.set_status(BASELINE_PHASE, TEST_PEND_STATUS) self._test_status.set_status(MEMCOMP_PHASE, TEST_PEND_STATUS) self._test_status.set_status(THROUGHPUT_PHASE, TEST_PEND_STATUS) self._phase_modifying_call(MEMLEAK_PHASE, self._check_for_memleak) self._phase_modifying_call(STARCHIVE_PHASE, self._st_archive_case_test) except BaseException as e: # We want KeyboardInterrupts to generate FAIL status success = False if isinstance(e, CIMEError): # Don't want to print stacktrace for a model failure since that # is not a CIME/infrastructure problem. excmsg = str(e) else: excmsg = "Exception during run:\n{}\n{}".format( str(e), traceback.format_exc() ) append_testlog(excmsg, self._orig_caseroot) raise finally: # Writing the run status should be the very last thing due to wait_for_tests time_taken = time.time() - start_time status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS with self._test_status: self._test_status.set_status( RUN_PHASE, status, comments=("time={:d}".format(int(time_taken))) ) config = Config.instance() if config.verbose_run_phase: # If run phase worked, remember the time it took in order to improve later walltime ests baseline_root = self._case.get_value("BASELINE_ROOT") if success: srcroot = self._case.get_value("SRCROOT") save_test_time( baseline_root, self._casebaseid, time_taken, get_current_commit(repo=srcroot), ) # If overall things did not pass, offer the user some insight into what might have broken things overall_status = self._test_status.get_overall_test_status( ignore_namelists=True )[0] if overall_status != TEST_PASS_STATUS: srcroot = self._case.get_value("SRCROOT") worked_before, last_pass, last_fail_transition = get_test_success( baseline_root, srcroot, self._casebaseid ) if worked_before: if last_pass is not None: # commits between last_pass and now broke things stat, out, err = run_cmd( "git rev-list --first-parent {}..{}".format( last_pass, "HEAD" ), from_dir=srcroot, ) if stat == 0: append_testlog( "NEW FAIL: Potentially broken merges:\n{}".format( out ), self._orig_caseroot, ) else: logger.warning( "Unable to list potentially broken merges: {}\n{}".format( out, err ) ) else: if last_pass is not None and last_fail_transition is not None: # commits between last_pass and last_fail_transition broke things stat, out, err = run_cmd( "git rev-list --first-parent {}..{}".format( last_pass, last_fail_transition ), from_dir=srcroot, ) if stat == 0: append_testlog( "OLD FAIL: Potentially broken merges:\n{}".format( out ), self._orig_caseroot, ) else: logger.warning( "Unable to list potentially broken merges: {}\n{}".format( out, err ) ) if config.baseline_store_teststatus and self._case.get_value( "GENERATE_BASELINE" ): baseline_dir = os.path.join( self._case.get_value("BASELINE_ROOT"), self._case.get_value("BASEGEN_CASE"), ) generate_teststatus(self._caseroot, baseline_dir) # We return success if the run phase worked; memleaks, diffs will NOT be taken into account # with this return value. return success
[docs] def run_phase(self): """ This is the default run phase implementation, it just does an individual run. This is the subclass' extension point if they need to define a custom run phase. PLEASE THROW AN EXCEPTION ON FAIL """ self.run_indv()
def _get_caseroot(self): """ Returns the current CASEROOT value """ return self._caseroot def _set_active_case(self, case): """ Use for tests that have multiple cases """ self._case = case self._case.load_env(reset=True) self._caseroot = case.get_value("CASEROOT")
[docs] def run_indv( self, suffix="base", st_archive=False, submit_resubmits=None, keep_init_generated_files=False, ): """ Perform an individual run. Raises an EXCEPTION on fail. keep_init_generated_files: If False (the default), we remove the init_generated_files subdirectory of the run directory before running the case. This is usually what we want for tests, but some specific tests may want to leave this directory in place, so can set this variable to True to do so. """ stop_n = self._case.get_value("STOP_N") stop_option = self._case.get_value("STOP_OPTION") run_type = self._case.get_value("RUN_TYPE") rundir = self._case.get_value("RUNDIR") try: self._case.check_all_input_data() except CIMEError: caseroot = self._case.get_value("CASEROOT") raise CIMEError( "Could not find all inputdata on any server, try " "manually running `./check_input_data --download " f"--verbose` from {caseroot!r}." ) from None if submit_resubmits is None: do_resub = self._case.get_value("BATCH_SYSTEM") != "none" else: do_resub = submit_resubmits # remove any cprnc output leftover from previous runs for compout in glob.iglob(os.path.join(rundir, "*.cprnc.out")): os.remove(compout) if not keep_init_generated_files: # remove all files in init_generated_files directory if it exists init_generated_files_dir = os.path.join( rundir, INIT_GENERATED_FILES_DIRNAME ) if os.path.isdir(init_generated_files_dir): for init_file in glob.iglob( os.path.join(init_generated_files_dir, "*") ): os.remove(init_file) infostr = "doing an {:d} {} {} test".format(stop_n, stop_option, run_type) rest_option = self._case.get_value("REST_OPTION") if rest_option == "none" or rest_option == "never": infostr += ", no restarts written" else: rest_n = self._case.get_value("REST_N") infostr += ", with restarts every {:d} {}".format(rest_n, rest_option) logger.info(infostr) self._case.case_run(skip_pnl=self._skip_pnl, submit_resubmits=do_resub) if not self._coupler_log_indicates_run_complete(): expect(False, "Coupler did not indicate run passed") if suffix is not None: self._component_compare_copy(suffix) if st_archive: self._case.case_st_archive(resubmit=True)
def _coupler_log_indicates_run_complete(self): newestcpllogfiles = get_latest_cpl_logs(self._case) logger.debug("Latest Coupler log file(s) {}".format(newestcpllogfiles)) # Exception is raised if the file is not compressed allgood = len(newestcpllogfiles) for cpllog in newestcpllogfiles: try: if b"SUCCESSFUL TERMINATION" in gzip.open(cpllog, "rb").read(): allgood = allgood - 1 except Exception as e: # Probably want to be more specific here msg = e.__str__() logger.info( "{} is not compressed, assuming run failed {}".format(cpllog, msg) ) return allgood == 0 def _component_compare_copy(self, suffix): # Only match .nc files comments, num_copied = copy_histfiles(self._case, suffix, match_suffix="nc") self._expected_num_cmp = num_copied append_testlog(comments, self._orig_caseroot) def _log_cprnc_output_tail(self, filename_pattern, prepend=None): rundir = self._case.get_value("RUNDIR") glob_pattern = "{}/{}".format(rundir, filename_pattern) cprnc_logs = glob.glob(glob_pattern) for output in cprnc_logs: with open(output) as fin: cprnc_log_tail = fin.readlines()[-20:] cprnc_log_tail.insert(0, "tail -n20 {}\n\n".format(output)) if prepend is not None: cprnc_log_tail.insert(0, "{}\n\n".format(prepend)) append_testlog("".join(cprnc_log_tail), self._orig_caseroot) def _component_compare_test( self, suffix1, suffix2, success_change=False, ignore_fieldlist_diffs=False ): """ Return value is not generally checked, but is provided in case a custom run case needs indirection based on success. If success_change is True, success requires some files to be different. If ignore_fieldlist_diffs is True, then: If the two cases differ only in their field lists (i.e., all shared fields are bit-for-bit, but one case has some diagnostic fields that are missing from the other case), treat the two cases as identical. """ success, comments, num_compared = self._do_compare_test( suffix1, suffix2, ignore_fieldlist_diffs=ignore_fieldlist_diffs ) if success_change: success = not success if ( self._expected_num_cmp is not None and num_compared is not None and self._expected_num_cmp != num_compared ): comments = comments.replace("PASS", "") comments += """\nWARNING Expected to compare {} hist files, but only compared {}. It's possible that the hist_file_extension entry in config_archive.xml is not correct for some of your components. """.format( self._expected_num_cmp, num_compared ) append_testlog(comments, self._orig_caseroot) pattern = "*.nc.{}.cprnc.out".format(suffix1) message = "compared suffixes suffix1 {!r} suffix2 {!r}".format(suffix1, suffix2) self._log_cprnc_output_tail(pattern, message) status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS with self._test_status: self._test_status.set_status( "{}_{}_{}".format(COMPARE_PHASE, suffix1, suffix2), status ) return success def _do_compare_test(self, suffix1, suffix2, ignore_fieldlist_diffs=False): """ Wraps the call to compare_test to facilitate replacement in unit tests """ return compare_test( self._case, suffix1, suffix2, ignore_fieldlist_diffs=ignore_fieldlist_diffs ) def _st_archive_case_test(self): result = self._case.test_env_archive() with self._test_status: if result: self._test_status.set_status(STARCHIVE_PHASE, TEST_PASS_STATUS) else: self._test_status.set_status(STARCHIVE_PHASE, TEST_FAIL_STATUS) def _phase_modifying_call(self, phase, function): """ Ensures that unexpected exceptions from phases will result in a FAIL result in the TestStatus file for that phase. """ try: function() except Exception as e: # Do NOT want to catch KeyboardInterrupt msg = e.__str__() excmsg = "Exception during {}:\n{}\n{}".format( phase, msg, traceback.format_exc() ) logger.warning(excmsg) append_testlog(excmsg, self._orig_caseroot) with self._test_status: self._test_status.set_status( phase, TEST_FAIL_STATUS, comments="exception" ) def _check_for_memleak(self): """ Examine memory usage as recorded in the cpl log file and look for unexpected increases. """ config = load_coupler_customization(self._case) # default to 0.1 tolerance = self._case.get_value("TEST_MEMLEAK_TOLERANCE") or 0.1 expect(tolerance > 0.0, "Bad value for memleak tolerance in test") with self._test_status: try: memleak, comment = config.perf_check_for_memory_leak( self._case, tolerance ) except AttributeError: memleak, comment = perf_check_for_memory_leak(self._case, tolerance) if memleak: append_testlog(comment, self._orig_caseroot) status = TEST_FAIL_STATUS else: status = TEST_PASS_STATUS self._test_status.set_status(MEMLEAK_PHASE, status, comments=comment)
[docs] def compare_env_run(self, expected=None): """ Compare env_run file to original and warn about differences """ components = self._case.get_values("COMP_CLASSES") f1obj = self._case.get_env("run") f2obj = EnvRun( self._caseroot, os.path.join(LOCKED_DIR, "env_run.orig.xml"), components=components, ) diffs = f1obj.compare_xml(f2obj) for key in diffs.keys(): if expected is not None and key in expected: logging.warning(" Resetting {} for test".format(key)) f1obj.set_value(key, f2obj.get_value(key, resolved=False)) else: print( "WARNING: Found difference in test {}: case: {} original value {}".format( key, diffs[key][0], diffs[key][1] ) ) return False return True
def _compare_memory(self): """ Compares current test memory usage to baseline. """ with self._test_status: try: below_tolerance, comment = perf_compare_memory_baseline(self._case) except Exception as e: logger.info("Failed to compare memory usage baseline: {!s}".format(e)) self._test_status.set_status( MEMCOMP_PHASE, TEST_FAIL_STATUS, comments=str(e) ) else: if below_tolerance is not None: append_testlog(comment, self._orig_caseroot) if ( below_tolerance and self._test_status.get_status(MEMCOMP_PHASE) is None ): self._test_status.set_status(MEMCOMP_PHASE, TEST_PASS_STATUS) elif ( self._test_status.get_status(MEMCOMP_PHASE) != TEST_FAIL_STATUS ): self._test_status.set_status( MEMCOMP_PHASE, TEST_FAIL_STATUS, comments=comment ) def _compare_throughput(self): """ Compares current test throughput to baseline. """ with self._test_status: try: below_tolerance, comment = perf_compare_throughput_baseline(self._case) except Exception as e: logger.info("Failed to compare throughput baseline: {!s}".format(e)) self._test_status.set_status( THROUGHPUT_PHASE, TEST_FAIL_STATUS, comments=str(e) ) else: if below_tolerance is not None: append_testlog(comment, self._orig_caseroot) if ( below_tolerance and self._test_status.get_status(THROUGHPUT_PHASE) is None ): self._test_status.set_status(THROUGHPUT_PHASE, TEST_PASS_STATUS) elif ( self._test_status.get_status(THROUGHPUT_PHASE) != TEST_FAIL_STATUS ): self._test_status.set_status( THROUGHPUT_PHASE, TEST_FAIL_STATUS, comments=comment ) def _compare_baseline(self): """ compare the current test output to a baseline result """ with self._test_status: # compare baseline success, comments = compare_baseline(self._case) append_testlog(comments, self._orig_caseroot) pattern = "*.nc.cprnc.out" self._log_cprnc_output_tail(pattern) status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS baseline_name = self._case.get_value("BASECMP_CASE") ts_comments = ( os.path.dirname(baseline_name) + ": " + get_ts_synopsis(comments) ) self._test_status.set_status(BASELINE_PHASE, status, comments=ts_comments) def _generate_baseline(self): """ generate a new baseline case based on the current test """ with self._test_status: # generate baseline success, comments = generate_baseline(self._case) append_testlog(comments, self._orig_caseroot) status = TEST_PASS_STATUS if success else TEST_FAIL_STATUS baseline_name = self._case.get_value("BASEGEN_CASE") self._test_status.set_status( GENERATE_PHASE, status, comments=os.path.dirname(baseline_name) ) basegen_dir = os.path.join( self._case.get_value("BASELINE_ROOT"), self._case.get_value("BASEGEN_CASE"), ) # copy latest cpl log to baseline # drop the date so that the name is generic newestcpllogfiles = get_latest_cpl_logs(self._case) with SharedArea(): # TODO ever actually more than one cpl log? for cpllog in newestcpllogfiles: m = re.search(r"/({}.*.log).*.gz".format(self._cpllog), cpllog) if m is not None: baselog = os.path.join(basegen_dir, m.group(1)) + ".gz" safe_copy( cpllog, os.path.join(basegen_dir, baselog), preserve_meta=False, ) perf_write_baseline(self._case, basegen_dir, cpllog)
[docs] def perf_check_for_memory_leak(case, tolerance): leak = False comment = "" latestcpllogs = get_latest_cpl_logs(case) for cpllog in latestcpllogs: try: memlist = perf_get_memory_list(case, cpllog) except RuntimeError: return False, "insufficient data for memleak test" # last day - second day, skip first day, can be too low while initializing elapsed_days = int(memlist[-1][0]) - int(memlist[1][0]) finalmem, originalmem = float(memlist[-1][1]), float(memlist[1][1]) memdiff = -1 if originalmem <= 0 else (finalmem - originalmem) / originalmem if memdiff < 0: leak = False comment = "data for memleak test is insufficient" elif memdiff < tolerance: leak = False comment = "" else: leak = True comment = ( "memleak detected, memory went from {:f} to {:f} in {:d} days".format( originalmem, finalmem, elapsed_days ) ) return leak, comment
[docs] class FakeTest(SystemTestsCommon): """ Inheriters of the FakeTest Class are intended to test the code. All members of the FakeTest Class must have names beginning with "TEST" this is so that the find_system_test in utils.py will work with these classes. """ def __init__(self, case, expected=None, **kwargs): super(FakeTest, self).__init__(case, expected=expected, **kwargs) self._script = None self._requires_exe = False self._case._non_local = True self._original_exe = self._case.get_value("run_exe") def _set_script(self, script, requires_exe=False): self._script = script self._requires_exe = requires_exe def _resetup_case(self, phase, reset=False): run_exe = self._case.get_value("run_exe") super(FakeTest, self)._resetup_case(phase, reset=reset) self._case.set_value("run_exe", run_exe)
[docs] def build_phase(self, sharedlib_only=False, model_only=False): if self._requires_exe: super(FakeTest, self).build_phase( sharedlib_only=sharedlib_only, model_only=model_only ) if not sharedlib_only: exeroot = self._case.get_value("EXEROOT") modelexe = os.path.join(exeroot, "fake.exe") self._case.set_value("run_exe", modelexe) with open(modelexe, "w") as f: f.write("#!/bin/bash\n") f.write(self._script) os.chmod(modelexe, 0o755) if not self._requires_exe: build.post_build(self._case, [], build_complete=True) else: expect( os.path.exists(modelexe), "Could not find expected file {}".format(modelexe), ) logger.info( "FakeTest build_phase complete {} {}".format( modelexe, self._requires_exe ) )
[docs] def run_indv( self, suffix="base", st_archive=False, submit_resubmits=None, keep_init_generated_files=False, ): mpilib = self._case.get_value("MPILIB") # This flag is needed by mpt to run a script under mpiexec if mpilib == "mpt": os.environ["MPI_SHEPHERD"] = "true" super(FakeTest, self).run_indv( suffix, st_archive=st_archive, submit_resubmits=submit_resubmits )
[docs] class TESTRUNPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False): rundir = self._case.get_value("RUNDIR") cimeroot = self._case.get_value("CIMEROOT") case = self._case.get_value("CASE") script = """ echo Insta pass echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc """.format( rundir=rundir, log=self._cpllog, root=cimeroot, case=case ) self._set_script(script) FakeTest.build_phase(self, sharedlib_only=sharedlib_only, model_only=model_only)
[docs] class TESTRUNDIFF(FakeTest): """ You can generate a diff with this test as follows: 1) Run the test and generate a baseline 2) set TESTRUNDIFF_ALTERNATE environment variable to TRUE 3) Re-run the same test from step 1 but do a baseline comparison instead of generation 3.a) This should give you a DIFF """
[docs] def build_phase(self, sharedlib_only=False, model_only=False): rundir = self._case.get_value("RUNDIR") cimeroot = self._case.get_value("CIMEROOT") case = self._case.get_value("CASE") script = """ echo Insta pass echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID if [ -z "$TESTRUNDIFF_ALTERNATE" ]; then cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc else cp {root}/scripts/tests/cpl.hi2.nc.test {rundir}/{case}.cpl.hi.0.nc fi """.format( rundir=rundir, log=self._cpllog, root=cimeroot, case=case ) self._set_script(script) FakeTest.build_phase(self, sharedlib_only=sharedlib_only, model_only=model_only)
[docs] class TESTRUNDIFFRESUBMIT(TESTRUNDIFF): pass
[docs] class TESTTESTDIFF(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False): rundir = self._case.get_value("RUNDIR") cimeroot = self._case.get_value("CIMEROOT") case = self._case.get_value("CASE") script = """ echo Insta pass echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc cp {root}/scripts/tests/cpl.hi2.nc.test {rundir}/{case}.cpl.hi.0.nc.rest """.format( rundir=rundir, log=self._cpllog, root=cimeroot, case=case ) self._set_script(script) super(TESTTESTDIFF, self).build_phase( sharedlib_only=sharedlib_only, model_only=model_only )
[docs] def run_phase(self): super(TESTTESTDIFF, self).run_phase() self._component_compare_test("base", "rest")
[docs] class TESTRUNFAIL(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False): rundir = self._case.get_value("RUNDIR") cimeroot = self._case.get_value("CIMEROOT") case = self._case.get_value("CASE") script = """ if [ -z "$TESTRUNFAIL_PASS" ]; then echo Insta fail echo model failed > {rundir}/{log}.log.$LID exit -1 else echo Insta pass echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc fi """.format( rundir=rundir, log=self._cpllog, root=cimeroot, case=case ) self._set_script(script) FakeTest.build_phase(self, sharedlib_only=sharedlib_only, model_only=model_only)
[docs] class TESTRUNFAILRESET(TESTRUNFAIL): """This fake test can fail for two reasons: 1. As in the TESTRUNFAIL test: If the environment variable TESTRUNFAIL_PASS is *not* set 2. Even if that environment variable *is* set, it will fail if STOP_N differs from the original value The purpose of (2) is to ensure that test's values get properly reset if the test is rerun after an initial failure. """
[docs] def run_indv( self, suffix="base", st_archive=False, submit_resubmits=None, keep_init_generated_files=False, ): # Make sure STOP_N matches the original value for the case. This tests that STOP_N # has been reset properly if we are rerunning the test after a failure. env_test = EnvTest(self._get_caseroot()) stop_n = self._case.get_value("STOP_N") stop_n_test = int(env_test.get_test_parameter("STOP_N")) expect( stop_n == stop_n_test, "Expect STOP_N to match original ({} != {})".format(stop_n, stop_n_test), ) # Now modify STOP_N so that an error will be generated if it isn't reset properly # upon a rerun self._case.set_value("STOP_N", stop_n + 1) super(TESTRUNFAILRESET, self).run_indv( suffix=suffix, st_archive=st_archive, submit_resubmits=submit_resubmits )
[docs] class TESTRUNFAILEXC(TESTRUNPASS):
[docs] def run_phase(self): raise RuntimeError("Exception from run_phase")
[docs] class TESTRUNSTARCFAIL(TESTRUNPASS): def _st_archive_case_test(self): raise RuntimeError("Exception from st archive")
[docs] class TESTBUILDFAIL(TESTRUNPASS):
[docs] def build_phase(self, sharedlib_only=False, model_only=False): if "TESTBUILDFAIL_PASS" in os.environ: TESTRUNPASS.build_phase(self, sharedlib_only, model_only) else: if not sharedlib_only: blddir = self._case.get_value("EXEROOT") bldlog = os.path.join( blddir, "{}.bldlog.{}".format(get_model(), get_timestamp("%y%m%d-%H%M%S")), ) with open(bldlog, "w") as fd: fd.write("BUILD FAIL: Intentional fail for testing infrastructure") expect(False, "BUILD FAIL: Intentional fail for testing infrastructure")
[docs] class TESTBUILDFAILEXC(FakeTest): def __init__(self, case, **kwargs): FakeTest.__init__(self, case, **kwargs) raise RuntimeError("Exception from init")
[docs] class TESTRUNUSERXMLCHANGE(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False): caseroot = self._case.get_value("CASEROOT") modelexe = self._case.get_value("run_exe") new_stop_n = self._case.get_value("STOP_N") * 2 script = """ cd {caseroot} ./xmlchange --file env_test.xml STOP_N={stopn} ./xmlchange RESUBMIT=1,STOP_N={stopn},CONTINUE_RUN=FALSE,RESUBMIT_SETS_CONTINUE_RUN=FALSE cd - {originalexe} "$@" cd {caseroot} ./xmlchange run_exe={modelexe} sleep 5 """.format( originalexe=self._original_exe, caseroot=caseroot, modelexe=modelexe, stopn=str(new_stop_n), ) self._set_script(script, requires_exe=True) FakeTest.build_phase(self, sharedlib_only=sharedlib_only, model_only=model_only)
[docs] def run_phase(self): self.run_indv(submit_resubmits=True)
[docs] class TESTRUNSLOWPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False): rundir = self._case.get_value("RUNDIR") cimeroot = self._case.get_value("CIMEROOT") case = self._case.get_value("CASE") script = """ sleep 300 echo Slow pass echo SUCCESSFUL TERMINATION > {rundir}/{log}.log.$LID cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc """.format( rundir=rundir, log=self._cpllog, root=cimeroot, case=case ) self._set_script(script) FakeTest.build_phase(self, sharedlib_only=sharedlib_only, model_only=model_only)
[docs] class TESTMEMLEAKFAIL(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False): rundir = self._case.get_value("RUNDIR") cimeroot = self._case.get_value("CIMEROOT") case = self._case.get_value("CASE") testfile = os.path.join(cimeroot, "scripts", "tests", "cpl.log.failmemleak.gz") script = """ echo Insta pass gunzip -c {testfile} > {rundir}/{log}.log.$LID cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc """.format( testfile=testfile, rundir=rundir, log=self._cpllog, root=cimeroot, case=case ) self._set_script(script) FakeTest.build_phase(self, sharedlib_only=sharedlib_only, model_only=model_only)
[docs] class TESTMEMLEAKPASS(FakeTest):
[docs] def build_phase(self, sharedlib_only=False, model_only=False): rundir = self._case.get_value("RUNDIR") cimeroot = self._case.get_value("CIMEROOT") case = self._case.get_value("CASE") testfile = os.path.join(cimeroot, "scripts", "tests", "cpl.log.passmemleak.gz") script = """ echo Insta pass gunzip -c {testfile} > {rundir}/{log}.log.$LID cp {root}/scripts/tests/cpl.hi1.nc.test {rundir}/{case}.cpl.hi.0.nc """.format( testfile=testfile, rundir=rundir, log=self._cpllog, root=cimeroot, case=case ) self._set_script(script) FakeTest.build_phase(self, sharedlib_only=sharedlib_only, model_only=model_only)