Source code for CIME.get_tests

import CIME.utils
from CIME.utils import expect, convert_to_seconds, parse_test_name, get_cime_root
from CIME.XML.machines import Machines
import sys, os

# Expect that, if a model wants to use python-based test lists, they will have a file
# $model/cime_config/tests.py , containing a test dictionary called _TESTS. Currently,
# only E3SM is using this feature.

sys.path.insert(0, os.path.join(get_cime_root(), "../cime_config"))
_ALL_TESTS = {}
try:
    from tests import _TESTS  # pylint: disable=import-error

    _ALL_TESTS.update(_TESTS)
except ImportError:
    pass

# Here are the tests belonging to cime suites. Format for individual tests is
# <test>.<grid>.<compset>[.<testmod>]
#
# suite_name : {
#     "inherit" : (suite1, suite2, ...), # Optional. Suites to inherit tests from. Default is None. Tuple, list, or str.
#     "time"    : "HH:MM:SS",            # Optional. Recommended upper-limit on test time.
#     "share"   : True|False,            # Optional. If True, all tests in this suite share a build. Default is False.
#     "perf"    : True|False,            # Optional. If True, all tests in this suite will do performance tracking. Default is False.
#     "tests"   : (test1, test2, ...)    # Optional. The list of tests for this suite. See above for format. Tuple, list, or str. This is the ONLY inheritable attribute.
# }

_CIME_TESTS = {
    "cime_tiny": {
        "time": "0:10:00",
        "tests": (
            "ERS.f19_g16_rx1.A",
            "NCK.f19_g16_rx1.A",
        ),
    },
    "cime_test_only_pass": {
        "time": "0:10:00",
        "tests": (
            "TESTRUNPASS_P1.f19_g16_rx1.A",
            "TESTRUNPASS_P1.ne30_g16_rx1.A",
            "TESTRUNPASS_P1.f45_g37_rx1.A",
        ),
    },
    "cime_test_only_slow_pass": {
        "time": "0:10:00",
        "tests": (
            "TESTRUNSLOWPASS_P1.f19_g16_rx1.A",
            "TESTRUNSLOWPASS_P1.ne30_g16_rx1.A",
            "TESTRUNSLOWPASS_P1.f45_g37_rx1.A",
        ),
    },
    "cime_test_only": {
        "time": "0:10:00",
        "tests": (
            "TESTBUILDFAIL_P1.f19_g16_rx1.A",
            "TESTBUILDFAILEXC_P1.f19_g16_rx1.A",
            "TESTRUNFAIL_P1.f19_g16_rx1.A",
            "TESTRUNSTARCFAIL_P1.f19_g16_rx1.A",
            "TESTRUNFAILEXC_P1.f19_g16_rx1.A",
            "TESTRUNPASS_P1.f19_g16_rx1.A",
            "TESTTESTDIFF_P1.f19_g16_rx1.A",
            "TESTMEMLEAKFAIL_P1.f09_g16.X",
            "TESTMEMLEAKPASS_P1.f09_g16.X",
        ),
    },
    "cime_test_all": {
        "inherit": "cime_test_only",
        "time": "0:10:00",
        "tests": "TESTRUNDIFF_P1.f19_g16_rx1.A",
    },
    "cime_test_share": {
        "time": "0:10:00",
        "share": True,
        "tests": (
            "SMS_P2.f19_g16_rx1.A",
            "SMS_P4.f19_g16_rx1.A",
            "SMS_P8.f19_g16_rx1.A",
            "SMS_P16.f19_g16_rx1.A",
        ),
    },
    "cime_test_share2": {
        "time": "0:10:00",
        "share": True,
        "tests": (
            "SMS_P2.f19_g16_rx1.X",
            "SMS_P4.f19_g16_rx1.X",
            "SMS_P8.f19_g16_rx1.X",
            "SMS_P16.f19_g16_rx1.X",
        ),
    },
    "cime_test_perf": {
        "time": "0:10:00",
        "perf": True,
        "tests": (
            "SMS_P2.T42_T42.S",
            "SMS_P4.T42_T42.S",
            "SMS_P8.T42_T42.S",
            "SMS_P16.T42_T42.S",
        ),
    },
    "cime_test_timing": {
        "time": "0:10:00",
        "tests": ("SMS_P1.T42_T42.S",),
    },
    "cime_test_repeat": {
        "tests": (
            "TESTRUNPASS_P1.f19_g16_rx1.A",
            "TESTRUNPASS_P2.ne30_g16_rx1.A",
            "TESTRUNPASS_P4.f45_g37_rx1.A",
        )
    },
    "cime_test_time": {
        "time": "0:13:00",
        "tests": ("TESTRUNPASS_P69.f19_g16_rx1.A.testmod",),
    },
    "cime_test_multi_inherit": {
        "inherit": ("cime_test_repeat", "cime_test_only_pass", "cime_test_all")
    },
    "cime_developer": {
        "time": "0:15:00",
        "tests": (
            "NCK_Ld3.f45_g37_rx1.A",
            "ERI_Ln9.f09_g16.X",
            "ERIO_Ln11.f09_g16.X",
            "SEQ_Ln9.f19_g16_rx1.A",
            "ERS.ne30_g16_rx1.A.drv-y100k",
            "IRT_N2_Vmct_Ln9.f19_g16_rx1.A",
            "ERR_Ln9.f45_g37_rx1.A",
            "ERP_Ln9.f45_g37_rx1.A",
            "SMS_D_Ln9_Mmpi-serial.f19_g16_rx1.A",
            "PET_Ln9_P4.f19_f19.A",
            "PEM_Ln9_P4.f19_f19.A",
            "SMS_Ln3.T42_T42.S",
            "PRE.f19_f19.ADESP",
            "PRE.f19_f19.ADESP_TEST",
            "MCC_P1.f19_g16_rx1.A",
            "LDSTA.f45_g37_rx1.A",
        ),
    },
}

_ALL_TESTS.update(_CIME_TESTS)

###############################################################################
def _get_key_data(raw_dict, key, the_type):
    ###############################################################################
    if key not in raw_dict:
        if the_type is tuple:
            return ()
        elif the_type is str:
            return None
        elif the_type is bool:
            return False
        else:
            expect(False, "Unsupported type {}".format(the_type))
    else:
        val = raw_dict[key]
        if the_type is tuple and isinstance(val, str):
            val = (val,)

        expect(
            isinstance(val, the_type),
            "Wrong type for {}, {} is a {} but expected {}".format(
                key, val, type(val), the_type
            ),
        )

        return val


###############################################################################
[docs] def get_test_data(suite): ############################################################################### """ For a given suite, returns (inherit, time, share, perf, tests) """ raw_dict = _ALL_TESTS[suite] for key in raw_dict.keys(): expect( key in ["inherit", "time", "share", "perf", "tests"], "Unexpected test key '{}'".format(key), ) return ( _get_key_data(raw_dict, "inherit", tuple), _get_key_data(raw_dict, "time", str), _get_key_data(raw_dict, "share", bool), _get_key_data(raw_dict, "perf", bool), _get_key_data(raw_dict, "tests", tuple), )
###############################################################################
[docs] def get_test_suites(): ############################################################################### return list(_ALL_TESTS.keys())
###############################################################################
[docs] def get_test_suite( suite, machine=None, compiler=None, skip_inherit=False, skip_tests=None ): ############################################################################### """ Return a list of FULL test names for a suite. """ expect(suite in get_test_suites(), "Unknown test suite: '{}'".format(suite)) machobj = Machines(machine=machine) machine = machobj.get_machine_name() if compiler is None: compiler = machobj.get_default_compiler() expect( machobj.is_valid_compiler(compiler), "Compiler {} not valid for machine {}".format(compiler, machine), ) inherits_from, _, _, _, tests_raw = get_test_data(suite) tests = [] for item in tests_raw: expect( isinstance(item, str), "Bad type of test {}, expected string".format(item), ) test_mods = None test_components = item.split(".") expect(len(test_components) in [3, 4], "Bad test name {}".format(item)) if len(test_components) == 4: test_name = ".".join(test_components[:-1]) test_mods = test_components[-1] else: test_name = item if not skip_tests or not test_name in skip_tests: tests.append( CIME.utils.get_full_test_name( test_name, machine=machine, compiler=compiler, testmods_string=test_mods, ) ) if not skip_inherit: for inherits in inherits_from: inherited_tests = get_test_suite(inherits, machine, compiler) for inherited_test in inherited_tests: if inherited_test not in tests: tests.append(inherited_test) return tests
###############################################################################
[docs] def suite_has_test(suite, test_full_name, skip_inherit=False): ############################################################################### _, _, _, _, machine, compiler, _ = CIME.utils.parse_test_name(test_full_name) expect(machine is not None, "{} is not a full test name".format(test_full_name)) tests = get_test_suite( suite, machine=machine, compiler=compiler, skip_inherit=skip_inherit ) return test_full_name in tests
###############################################################################
[docs] def get_build_groups(tests): ############################################################################### """ Given a list of tests, return a list of lists, with each list representing a group of tests that can share executables. >>> tests = ["SMS_P2.f19_g16_rx1.A.melvin_gnu", "SMS_P4.f19_g16_rx1.A.melvin_gnu", "SMS_P2.f19_g16_rx1.X.melvin_gnu", "SMS_P4.f19_g16_rx1.X.melvin_gnu", "TESTRUNSLOWPASS_P1.f19_g16_rx1.A.melvin_gnu", "TESTRUNSLOWPASS_P1.ne30_g16_rx1.A.melvin_gnu"] >>> get_build_groups(tests) [('SMS_P2.f19_g16_rx1.A.melvin_gnu', 'SMS_P4.f19_g16_rx1.A.melvin_gnu'), ('SMS_P2.f19_g16_rx1.X.melvin_gnu', 'SMS_P4.f19_g16_rx1.X.melvin_gnu'), ('TESTRUNSLOWPASS_P1.f19_g16_rx1.A.melvin_gnu',), ('TESTRUNSLOWPASS_P1.ne30_g16_rx1.A.melvin_gnu',)] """ build_groups = [] # list of tuples ([tests], set(suites)) # Get a list of suites that share exes suites = get_test_suites() share_suites = [] for suite in suites: share = get_test_data(suite)[2] if share: share_suites.append(suite) # Divide tests up into build groups. Assumes that build-compatibility is transitive for test in tests: matched = False my_share_suites = set() for suite in share_suites: if suite_has_test(suite, test, skip_inherit=True): my_share_suites.add(suite) # Try to match this test with an existing build group if my_share_suites: for build_group_tests, build_group_suites in build_groups: overlap = build_group_suites & my_share_suites if overlap: matched = True build_group_tests.append(test) build_group_suites.update(my_share_suites) break # Nothing matched, this test is in a build group of its own if not matched: build_groups.append(([test], my_share_suites)) return [tuple(item[0]) for item in build_groups]
###############################################################################
[docs] def is_perf_test(test): ############################################################################### """ Is the provided test in a suite with perf=True? >>> is_perf_test("SMS_P2.T42_T42.S.melvin_gnu") True >>> is_perf_test("SMS_P2.f19_g16_rx1.X.melvin_gnu") False >>> is_perf_test("PFS_P2.f19_g16_rx1.X.melvin_gnu") True """ # Get a list of performance suites if test.startswith("PFS"): return True else: suites = get_test_suites() for suite in suites: perf = get_test_data(suite)[3] if perf and suite_has_test(suite, test, skip_inherit=True): return True return False
###############################################################################
[docs] def infer_arch_from_tests(testargs): ############################################################################### """ Return a tuple (machine, [compilers]) that can be inferred from the test args >>> infer_arch_from_tests(["NCK.f19_g16_rx1.A.melvin_gnu"]) ('melvin', ['gnu']) >>> infer_arch_from_tests(["NCK.f19_g16_rx1.A"]) (None, []) >>> infer_arch_from_tests(["NCK.f19_g16_rx1.A", "NCK.f19_g16_rx1.A.melvin_gnu"]) ('melvin', ['gnu']) >>> infer_arch_from_tests(["NCK.f19_g16_rx1.A.melvin_gnu", "NCK.f19_g16_rx1.A.melvin_gnu"]) ('melvin', ['gnu']) >>> infer_arch_from_tests(["NCK.f19_g16_rx1.A.melvin_gnu9", "NCK.f19_g16_rx1.A.melvin_gnu"]) ('melvin', ['gnu9', 'gnu']) >>> infer_arch_from_tests(["NCK.f19_g16_rx1.A.melvin_gnu", "NCK.f19_g16_rx1.A.mappy_gnu"]) Traceback (most recent call last): ... CIME.utils.CIMEError: ERROR: Must have consistent machine 'melvin' != 'mappy' """ e3sm_test_suites = get_test_suites() machine = None compilers = [] for testarg in testargs: testarg = testarg.strip() if testarg.startswith("^"): testarg = testarg[1:] if testarg not in e3sm_test_suites: machine_for_this_test, compiler_for_this_test = parse_test_name(testarg)[ 4:6 ] if machine_for_this_test is not None: if machine is None: machine = machine_for_this_test else: expect( machine == machine_for_this_test, "Must have consistent machine '%s' != '%s'" % (machine, machine_for_this_test), ) if ( compiler_for_this_test is not None and compiler_for_this_test not in compilers ): compilers.append(compiler_for_this_test) return machine, compilers
###############################################################################
[docs] def get_full_test_names(testargs, machine, compiler): ############################################################################### """ Return full test names in the form: TESTCASE.GRID.COMPSET.MACHINE_COMPILER.TESTMODS Testmods are optional Testargs can be categories or test names and support the NOT symbol '^' >>> get_full_test_names(["cime_tiny"], "melvin", "gnu") ['ERS.f19_g16_rx1.A.melvin_gnu', 'NCK.f19_g16_rx1.A.melvin_gnu'] >>> get_full_test_names(["cime_tiny", "PEA_P1_M.f45_g37_rx1.A"], "melvin", "gnu") ['ERS.f19_g16_rx1.A.melvin_gnu', 'NCK.f19_g16_rx1.A.melvin_gnu', 'PEA_P1_M.f45_g37_rx1.A.melvin_gnu'] >>> get_full_test_names(['ERS.f19_g16_rx1.A', 'NCK.f19_g16_rx1.A', 'PEA_P1_M.f45_g37_rx1.A'], "melvin", "gnu") ['ERS.f19_g16_rx1.A.melvin_gnu', 'NCK.f19_g16_rx1.A.melvin_gnu', 'PEA_P1_M.f45_g37_rx1.A.melvin_gnu'] >>> get_full_test_names(["cime_tiny", "^NCK.f19_g16_rx1.A"], "melvin", "gnu") ['ERS.f19_g16_rx1.A.melvin_gnu'] >>> get_full_test_names(["cime_test_multi_inherit"], "melvin", "gnu") ['TESTBUILDFAILEXC_P1.f19_g16_rx1.A.melvin_gnu', 'TESTBUILDFAIL_P1.f19_g16_rx1.A.melvin_gnu', 'TESTMEMLEAKFAIL_P1.f09_g16.X.melvin_gnu', 'TESTMEMLEAKPASS_P1.f09_g16.X.melvin_gnu', 'TESTRUNDIFF_P1.f19_g16_rx1.A.melvin_gnu', 'TESTRUNFAILEXC_P1.f19_g16_rx1.A.melvin_gnu', 'TESTRUNFAIL_P1.f19_g16_rx1.A.melvin_gnu', 'TESTRUNPASS_P1.f19_g16_rx1.A.melvin_gnu', 'TESTRUNPASS_P1.f45_g37_rx1.A.melvin_gnu', 'TESTRUNPASS_P1.ne30_g16_rx1.A.melvin_gnu', 'TESTRUNPASS_P2.ne30_g16_rx1.A.melvin_gnu', 'TESTRUNPASS_P4.f45_g37_rx1.A.melvin_gnu', 'TESTRUNSTARCFAIL_P1.f19_g16_rx1.A.melvin_gnu', 'TESTTESTDIFF_P1.f19_g16_rx1.A.melvin_gnu'] """ expect(machine is not None, "Must define a machine") expect(compiler is not None, "Must define a compiler") e3sm_test_suites = get_test_suites() tests_to_run = set() negations = set() for testarg in testargs: # remove any whitespace in name testarg = testarg.strip() if testarg.startswith("^"): negations.add(testarg[1:]) elif testarg in e3sm_test_suites: tests_to_run.update(get_test_suite(testarg, machine, compiler)) else: try: tests_to_run.add( CIME.utils.get_full_test_name( testarg, machine=machine, compiler=compiler ) ) except Exception: if "." not in testarg: expect(False, "Unrecognized test suite '{}'".format(testarg)) else: raise for negation in negations: if negation in e3sm_test_suites: tests_to_run -= set(get_test_suite(negation, machine, compiler)) else: fullname = CIME.utils.get_full_test_name( negation, machine=machine, compiler=compiler ) if fullname in tests_to_run: tests_to_run.remove(fullname) return list(sorted(tests_to_run))
############################################################################### ###############################################################################
[docs] def key_test_time(test_full_name): ############################################################################### result = get_recommended_test_time(test_full_name) return 99999999 if result is None else convert_to_seconds(result)