"""
A library for scheduling/running through the phases of a set
of system tests. Supports phase-level parallelism (can make progres
on multiple system tests at once).
TestScheduler will handle the TestStatus for the 1-time setup
phases. All other phases need to handle their own status because
they can be run outside the context of TestScheduler.
"""
import traceback, stat, threading, time, glob
from collections import OrderedDict
from CIME.XML.standard_module_setup import *
import CIME.compare_namelists
import CIME.utils
import six
from get_tests import get_recommended_test_time
from CIME.utils import append_status, append_testlog, TESTS_FAILED_ERR_CODE, parse_test_name, get_full_test_name, get_model, convert_to_seconds
from CIME.test_status import *
from CIME.XML.machines import Machines
from CIME.XML.generic_xml import GenericXML
from CIME.XML.env_test import EnvTest
from CIME.XML.files import Files
from CIME.XML.component import Component
from CIME.XML.tests import Tests
from CIME.case import Case
from CIME.wait_for_tests import wait_for_tests
from CIME.provenance import get_recommended_test_time_based_on_past
from CIME.locked_files import lock_file
import CIME.test_utils
logger = logging.getLogger(__name__)
# Phases managed by TestScheduler
TEST_START = "INIT" # Special pseudo-phase just for test_scheduler bookkeeping
PHASES = [TEST_START, CREATE_NEWCASE_PHASE, XML_PHASE, SETUP_PHASE,
SHAREDLIB_BUILD_PHASE, MODEL_BUILD_PHASE, RUN_PHASE] # Order matters
###############################################################################
def _translate_test_names_for_new_pecount(test_names, force_procs, force_threads):
###############################################################################
new_test_names = []
caseopts = []
for test_name in test_names:
testcase, caseopts, grid, compset, machine, compiler, testmod = parse_test_name(test_name)
rewrote_caseopt = False
if caseopts is not None:
for idx, caseopt in enumerate(caseopts):
if caseopt.startswith("P"):
caseopt = caseopt[1:]
if "x" in caseopt:
old_procs, old_thrds = caseopt.split("x")
else:
old_procs, old_thrds = caseopt, None
new_procs = force_procs if force_procs is not None else old_procs
new_thrds = force_threads if force_threads is not None else old_thrds
newcaseopt = ("P{}".format(new_procs)) if new_thrds is None else ("P{}x{}".format(new_procs, new_thrds))
caseopts[idx] = newcaseopt
rewrote_caseopt = True
break
if not rewrote_caseopt:
force_procs = "M" if force_procs is None else force_procs
newcaseopt = ("P{}".format(force_procs)) if force_threads is None else ("P{}x{}".format(force_procs, force_threads))
if caseopts is None:
caseopts = [newcaseopt]
else:
caseopts.append(newcaseopt)
new_test_name = get_full_test_name(testcase, caseopts=caseopts, grid=grid, compset=compset, machine=machine, compiler=compiler, testmod=testmod)
new_test_names.append(new_test_name)
return new_test_names
_TIME_CACHE = {}
###############################################################################
def _get_time_est(test, baseline_root, as_int=False, use_cache=False, raw=False):
###############################################################################
if test in _TIME_CACHE and use_cache:
return _TIME_CACHE[test]
recommended_time = get_recommended_test_time_based_on_past(baseline_root, test, raw=raw)
if recommended_time is None:
recommended_time = get_recommended_test_time(test)
if as_int:
if recommended_time is None:
recommended_time = 9999999999
else:
recommended_time = convert_to_seconds(recommended_time)
if use_cache:
_TIME_CACHE[test] = recommended_time
return recommended_time
###############################################################################
def _order_tests_by_runtime(tests, baseline_root):
###############################################################################
tests.sort(key=lambda x: _get_time_est(x, baseline_root, as_int=True, use_cache=True, raw=True), reverse=True)
###############################################################################
[docs]class TestScheduler(object):
###############################################################################
###########################################################################
def __init__(self, test_names, test_data=None,
no_run=False, no_build=False, no_setup=False, no_batch=None,
test_root=None, test_id=None,
machine_name=None, compiler=None,
baseline_root=None, baseline_cmp_name=None, baseline_gen_name=None,
clean=False, namelists_only=False,
project=None, parallel_jobs=None,
walltime=None, proc_pool=None,
use_existing=False, save_timing=False, queue=None,
allow_baseline_overwrite=False, output_root=None,
force_procs=None, force_threads=None, mpilib=None,
input_dir=None, pesfile=None, mail_user=None, mail_type=None):
###########################################################################
self._cime_root = CIME.utils.get_cime_root()
self._cime_model = get_model()
self._save_timing = save_timing
self._queue = queue
self._test_data = {} if test_data is None else test_data # Format: {test_name -> {data_name -> data}}
self._mpilib = mpilib # allow override of default mpilib
self._completed_tests = 0
self._input_dir = input_dir
self._pesfile = pesfile
self._allow_baseline_overwrite = allow_baseline_overwrite
self._mail_user = mail_user
self._mail_type = mail_type
self._machobj = Machines(machine=machine_name)
self._model_build_cost = 4
# If user is forcing procs or threads, re-write test names to reflect this.
if force_procs or force_threads:
test_names = _translate_test_names_for_new_pecount(test_names, force_procs, force_threads)
self._no_setup = no_setup
self._no_build = no_build or no_setup or namelists_only
self._no_run = no_run or self._no_build
self._output_root = output_root
# Figure out what project to use
if project is None:
self._project = CIME.utils.get_project()
if self._project is None:
self._project = self._machobj.get_value("PROJECT")
else:
self._project = project
# We will not use batch system if user asked for no_batch or if current
# machine is not a batch machine
self._no_batch = no_batch or not self._machobj.has_batch_system()
expect(not (self._no_batch and self._queue is not None),
"Does not make sense to request a queue without batch system")
# Determine and resolve test_root
if test_root is not None:
self._test_root = test_root
elif self._output_root is not None:
self._test_root = self._output_root
else:
self._test_root = self._machobj.get_value("CIME_OUTPUT_ROOT")
if self._project is not None:
self._test_root = self._test_root.replace("$PROJECT", self._project)
self._test_root = os.path.abspath(self._test_root)
self._test_id = test_id if test_id is not None else CIME.utils.get_timestamp()
self._compiler = self._machobj.get_default_compiler() if compiler is None else compiler
self._clean = clean
self._namelists_only = namelists_only
self._walltime = walltime
if parallel_jobs is None:
self._parallel_jobs = min(len(test_names),
self._machobj.get_value("MAX_MPITASKS_PER_NODE"))
else:
self._parallel_jobs = parallel_jobs
self._baseline_cmp_name = baseline_cmp_name # Implies comparison should be done if not None
self._baseline_gen_name = baseline_gen_name # Implies generation should be done if not None
# Compute baseline_root
self._baseline_root = baseline_root if baseline_root is not None \
else self._machobj.get_value("BASELINE_ROOT")
if self._project is not None:
self._baseline_root = self._baseline_root.replace("$PROJECT", self._project)
self._baseline_root = os.path.abspath(self._baseline_root)
if baseline_cmp_name or baseline_gen_name:
if self._baseline_cmp_name:
full_baseline_dir = os.path.join(self._baseline_root, self._baseline_cmp_name)
expect(os.path.isdir(full_baseline_dir),
"Missing baseline comparison directory {}".format(full_baseline_dir))
# the following is to assure that the existing generate directory is not overwritten
if self._baseline_gen_name:
full_baseline_dir = os.path.join(self._baseline_root, self._baseline_gen_name)
existing_baselines = []
for test_name in test_names:
test_baseline = os.path.join(full_baseline_dir, test_name)
if os.path.isdir(test_baseline):
existing_baselines.append(test_baseline)
expect(allow_baseline_overwrite or len(existing_baselines) == 0,
"Baseline directories already exists {}\n" \
"Use -o to avoid this error".format(existing_baselines))
if self._cime_model == "e3sm":
_order_tests_by_runtime(test_names, self._baseline_root)
# This is the only data that multiple threads will simultaneously access
# Each test has it's own value and setting/retrieving items from a dict
# is atomic, so this should be fine to use without mutex.
# name -> (phase, status)
self._tests = OrderedDict()
for test_name in test_names:
self._tests[test_name] = (TEST_START, TEST_PASS_STATUS)
# Oversubscribe by 1/4
if proc_pool is None:
pes = int(self._machobj.get_value("MAX_TASKS_PER_NODE"))
self._proc_pool = int(pes * 1.25)
else:
self._proc_pool = int(proc_pool)
self._procs_avail = self._proc_pool
# Setup phases
self._phases = list(PHASES)
if self._no_setup:
self._phases.remove(SETUP_PHASE)
if self._no_build:
self._phases.remove(SHAREDLIB_BUILD_PHASE)
self._phases.remove(MODEL_BUILD_PHASE)
if self._no_run:
self._phases.remove(RUN_PHASE)
if use_existing:
for test in self._tests:
with TestStatus(self._get_test_dir(test)) as ts:
for phase, status in ts:
if phase in CORE_PHASES:
if status in [TEST_PEND_STATUS, TEST_FAIL_STATUS]:
if status == TEST_FAIL_STATUS:
# Import for potential subsequent waits
ts.set_status(phase, TEST_PEND_STATUS)
# We need to pick up here
break
else:
if phase != SUBMIT_PHASE:
# Somewhat subtle. Create_test considers submit/run to be the run phase,
# so don't try to update test status for a passed submit phase
self._update_test_status(test, phase, TEST_PEND_STATUS)
self._update_test_status(test, phase, status)
if phase == RUN_PHASE:
logger.info("Test {} passed and will not be re-run".format(test))
logger.info("Using existing test directory {}".format(self._get_test_dir(test)))
else:
# None of the test directories should already exist.
for test in self._tests:
expect(not os.path.exists(self._get_test_dir(test)),
"Cannot create new case in directory '{}', it already exists."
" Pick a different test-id".format(self._get_test_dir(test)))
logger.info("Creating test directory {}".format(self._get_test_dir(test)))
# By the end of this constructor, this program should never hard abort,
# instead, errors will be placed in the TestStatus files for the various
# tests cases
###########################################################################
[docs] def get_testnames(self):
###########################################################################
return list(self._tests.keys())
###########################################################################
def _log_output(self, test, output):
###########################################################################
test_dir = self._get_test_dir(test)
if not os.path.isdir(test_dir):
# Note: making this directory could cause create_newcase to fail
# if this is run before.
os.makedirs(test_dir)
append_testlog(output, caseroot=test_dir)
###########################################################################
def _get_case_id(self, test):
###########################################################################
baseline_action_code = ""
if self._baseline_gen_name:
baseline_action_code += "G"
if self._baseline_cmp_name:
baseline_action_code += "C"
if len(baseline_action_code) > 0:
return "{}.{}.{}".format(test, baseline_action_code, self._test_id)
else:
return "{}.{}".format(test, self._test_id)
###########################################################################
def _get_test_dir(self, test):
###########################################################################
return os.path.join(self._test_root, self._get_case_id(test))
###########################################################################
def _get_test_data(self, test):
###########################################################################
# Must be atomic
return self._tests[test]
###########################################################################
def _is_broken(self, test):
###########################################################################
status = self._get_test_status(test)
return status != TEST_PASS_STATUS and status != TEST_PEND_STATUS
###########################################################################
def _work_remains(self, test):
###########################################################################
test_phase, test_status = self._get_test_data(test)
return (test_status == TEST_PASS_STATUS or test_status == TEST_PEND_STATUS) and\
test_phase != self._phases[-1]
###########################################################################
def _get_test_status(self, test, phase=None):
###########################################################################
curr_phase, curr_status = self._get_test_data(test)
if phase is None or phase == curr_phase:
return curr_status
else:
expect(phase is None or self._phases.index(phase) < self._phases.index(curr_phase),
"Tried to see the future")
# Assume all older phases PASSed
return TEST_PASS_STATUS
###########################################################################
def _get_test_phase(self, test):
###########################################################################
return self._get_test_data(test)[0]
###########################################################################
def _update_test_status(self, test, phase, status):
###########################################################################
phase_idx = self._phases.index(phase)
old_phase, old_status = self._get_test_data(test)
if old_phase == phase:
expect(old_status == TEST_PEND_STATUS,
"Only valid to transition from PEND to something else, found '{}' for phase '{}'".format(old_status, phase))
expect(status != TEST_PEND_STATUS,
"Cannot transition from PEND -> PEND")
else:
expect(old_status == TEST_PASS_STATUS,
"Why did we move on to next phase when prior phase did not pass?")
expect(status == TEST_PEND_STATUS,
"New phase should be set to pending status")
expect(self._phases.index(old_phase) == phase_idx - 1,
"Skipped phase? {} {}".format(old_phase, phase_idx))
# Must be atomic
self._tests[test] = (phase, status)
###########################################################################
def _shell_cmd_for_phase(self, test, cmd, phase, from_dir=None):
###########################################################################
while True:
rc, output, errput = run_cmd(cmd, from_dir=from_dir)
if rc != 0:
self._log_output(test,
"{} FAILED for test '{}'.\nCommand: {}\nOutput: {}\n".
format(phase, test, cmd,
output.encode('utf-8') + b"\n" + errput.encode('utf-8')))
# Temporary hack to get around odd file descriptor use by
# buildnml scripts.
if "bad interpreter" in output:
time.sleep(1)
continue
else:
return False, errput
else:
# We don't want "RUN PASSED" in the TestStatus.log if the only thing that
# succeeded was the submission.
phase = "SUBMIT" if phase == RUN_PHASE else phase
self._log_output(test,
"{} PASSED for test '{}'.\nCommand: {}\nOutput: {}\n".
format(phase, test, cmd,
output.encode('utf-8') + b"\n" + errput.encode('utf-8')))
return True, errput
###########################################################################
def _create_newcase_phase(self, test):
###########################################################################
test_dir = self._get_test_dir(test)
_, case_opts, grid, compset,\
machine, compiler, test_mods = CIME.utils.parse_test_name(test)
create_newcase_cmd = "{} --case {} --res {} --compset {}"\
" --test".format(os.path.join(self._cime_root, "scripts", "create_newcase"),
test_dir, grid, compset)
if machine is not None:
create_newcase_cmd += " --machine {}".format(machine)
if compiler is not None:
create_newcase_cmd += " --compiler {}".format(compiler)
if self._project is not None:
create_newcase_cmd += " --project {} ".format(self._project)
if self._output_root is not None:
create_newcase_cmd += " --output-root {} ".format(self._output_root)
if self._input_dir is not None:
create_newcase_cmd += " --input-dir {} ".format(self._input_dir)
if self._pesfile is not None:
create_newcase_cmd += " --pesfile {} ".format(self._pesfile)
if test_mods is not None:
files = Files()
if test_mods.find('/') != -1:
(component, modspath) = test_mods.split('/', 1)
else:
error = "Missing testmod component. Testmods are specified as '${component}-${testmod}'"
self._log_output(test, error)
return False, error
testmods_dir = files.get_value("TESTS_MODS_DIR", {"component": component})
test_mod_file = os.path.join(testmods_dir, component, modspath)
if not os.path.exists(test_mod_file):
error = "Missing testmod file '{}'".format(test_mod_file)
self._log_output(test, error)
return False, error
create_newcase_cmd += " --user-mods-dir {}".format(test_mod_file)
mpilib = None
ninst = 1
ncpl = 1
if case_opts is not None:
for case_opt in case_opts: # pylint: disable=not-an-iterable
if case_opt.startswith('M'):
mpilib = case_opt[1:]
create_newcase_cmd += " --mpilib {}".format(mpilib)
logger.debug (" MPILIB set to {}".format(mpilib))
elif case_opt.startswith('N'):
expect(ncpl == 1,"Cannot combine _C and _N options")
ninst = case_opt[1:]
create_newcase_cmd += " --ninst {}".format(ninst)
logger.debug (" NINST set to {}".format(ninst))
elif case_opt.startswith('C'):
expect(ninst == 1,"Cannot combine _C and _N options")
ncpl = case_opt[1:]
create_newcase_cmd += " --ninst {} --multi-driver" .format(ncpl)
logger.debug (" NCPL set to {}" .format(ncpl))
elif case_opt.startswith('P'):
pesize = case_opt[1:]
create_newcase_cmd += " --pecount {}".format(pesize)
elif case_opt.startswith('V'):
driver = case_opt[1:]
create_newcase_cmd += " --driver {}".format(driver)
# create_test mpilib option overrides default but not explicitly set case_opt mpilib
if mpilib is None and self._mpilib is not None:
create_newcase_cmd += " --mpilib {}".format(self._mpilib)
logger.debug (" MPILIB set to {}".format(self._mpilib))
if self._queue is not None:
create_newcase_cmd += " --queue={}".format(self._queue)
else:
# We need to hard code the queue for this test on cheyenne
# otherwise it runs in share and fails intermittently
test_case = CIME.utils.parse_test_name(test)[0]
if test_case == "NODEFAIL":
machine = machine if machine is not None else self._machobj.get_machine_name()
if machine == "cheyenne":
create_newcase_cmd += " --queue=regular"
if self._walltime is not None:
create_newcase_cmd += " --walltime {}".format(self._walltime)
else:
# model specific ways of setting time
if self._cime_model == "e3sm":
recommended_time = _get_time_est(test, self._baseline_root)
if recommended_time is not None:
create_newcase_cmd += " --walltime {}".format(recommended_time)
else:
if test in self._test_data and "options" in self._test_data[test] and \
"wallclock" in self._test_data[test]['options']:
create_newcase_cmd += " --walltime {}".format(self._test_data[test]['options']['wallclock'])
logger.debug("Calling create_newcase: " + create_newcase_cmd)
return self._shell_cmd_for_phase(test, create_newcase_cmd, CREATE_NEWCASE_PHASE)
###########################################################################
def _xml_phase(self, test):
###########################################################################
test_case = CIME.utils.parse_test_name(test)[0]
# Create, fill and write an envtest object
test_dir = self._get_test_dir(test)
envtest = EnvTest(test_dir)
# Determine list of component classes that this coupler/driver knows how
# to deal with. This list follows the same order as compset longnames follow.
files = Files()
drv_config_file = files.get_value("CONFIG_CPL_FILE")
drv_comp = Component(drv_config_file, "CPL")
envtest.add_elements_by_group(files, {}, "env_test.xml")
envtest.add_elements_by_group(drv_comp, {}, "env_test.xml")
envtest.set_value("TESTCASE", test_case)
envtest.set_value("TEST_TESTID", self._test_id)
envtest.set_value("CASEBASEID", test)
if test in self._test_data and "options" in self._test_data[test] and \
"memleak_tolerance" in self._test_data[test]['options']:
envtest.set_value("TEST_MEMLEAK_TOLERANCE", self._test_data[test]['options']['memleak_tolerance'])
test_argv = "-testname {} -testroot {}".format(test, self._test_root)
if self._baseline_gen_name:
test_argv += " -generate {}".format(self._baseline_gen_name)
basegen_case_fullpath = os.path.join(self._baseline_root,self._baseline_gen_name, test)
logger.debug("basegen_case is {}".format(basegen_case_fullpath))
envtest.set_value("BASELINE_NAME_GEN", self._baseline_gen_name)
envtest.set_value("BASEGEN_CASE", os.path.join(self._baseline_gen_name, test))
if self._baseline_cmp_name:
test_argv += " -compare {}".format(self._baseline_cmp_name)
envtest.set_value("BASELINE_NAME_CMP", self._baseline_cmp_name)
envtest.set_value("BASECMP_CASE", os.path.join(self._baseline_cmp_name, test))
envtest.set_value("TEST_ARGV", test_argv)
envtest.set_value("CLEANUP", self._clean)
envtest.set_value("BASELINE_ROOT", self._baseline_root)
envtest.set_value("GENERATE_BASELINE", self._baseline_gen_name is not None)
envtest.set_value("COMPARE_BASELINE", self._baseline_cmp_name is not None)
envtest.set_value("CCSM_CPRNC", self._machobj.get_value("CCSM_CPRNC", resolved=False))
tput_tolerance = self._machobj.get_value("TEST_TPUT_TOLERANCE", resolved=False)
if test in self._test_data and "options" in self._test_data[test] and \
"tput_tolerance" in self._test_data[test]['options']:
tput_tolerance = self._test_data[test]['options']['tput_tolerance']
envtest.set_value("TEST_TPUT_TOLERANCE", 0.25 if tput_tolerance is None else tput_tolerance)
# Add the test instructions from config_test to env_test in the case
config_test = Tests()
testnode = config_test.get_test_node(test_case)
envtest.add_test(testnode)
# Determine the test_case from the test name
test_case, case_opts = CIME.utils.parse_test_name(test)[:2]
# Determine case_opts from the test_case
if case_opts is not None:
logger.debug("case_opts are {} ".format(case_opts))
for opt in case_opts: # pylint: disable=not-an-iterable
logger.debug("case_opt is {}".format(opt))
if opt == 'D':
envtest.set_test_parameter("DEBUG", "TRUE")
logger.debug (" DEBUG set to TRUE")
elif opt == 'E':
envtest.set_test_parameter("USE_ESMF_LIB", "TRUE")
logger.debug (" USE_ESMF_LIB set to TRUE")
elif opt == 'CG':
envtest.set_test_parameter("CALENDAR", "GREGORIAN")
logger.debug (" CALENDAR set to {}".format(opt))
elif opt.startswith('L'):
match = re.match('L([A-Za-z])([0-9]*)', opt)
stop_option = {"y":"nyears", "m":"nmonths", "d":"ndays", "h":"nhours",
"s":"nseconds", "n":"nsteps"}
opt = match.group(1)
envtest.set_test_parameter("STOP_OPTION",stop_option[opt])
opti = match.group(2)
envtest.set_test_parameter("STOP_N", opti)
logger.debug (" STOP_OPTION set to {}".format(stop_option[opt]))
logger.debug (" STOP_N set to {}".format(opti))
elif opt.startswith('R'):
# R option is for testing in PTS_MODE or Single Column Model
# (SCM) mode
envtest.set_test_parameter("PTS_MODE", "TRUE")
# For PTS_MODE, compile with mpi-serial
envtest.set_test_parameter("MPILIB", "mpi-serial")
elif (opt.startswith('I') or # Marker to distinguish tests with same name - ignored
opt.startswith('M') or # handled in create_newcase
opt.startswith('P') or # handled in create_newcase
opt.startswith('N') or # handled in create_newcase
opt.startswith('C') or # handled in create_newcase
opt.startswith('V')): # handled in create_newcase
pass
elif opt.startswith('IOP'):
logger.warning("IOP test option not yet implemented")
else:
expect(False, "Could not parse option '{}' ".format(opt))
envtest.write()
lock_file("env_run.xml", caseroot=test_dir, newname="env_run.orig.xml")
with Case(test_dir, read_only=False) as case:
if self._output_root is None:
self._output_root = case.get_value("CIME_OUTPUT_ROOT")
# if we are running a single test we don't need sharedlibroot
if len(self._tests) > 1 and self._cime_model != "e3sm":
case.set_value("SHAREDLIBROOT",
os.path.join(self._output_root,
"sharedlibroot.{}".format(self._test_id)))
envtest.set_initial_values(case)
case.set_value("TEST", True)
case.set_value("SAVE_TIMING", self._save_timing)
# Scale back build parallelism on systems with few cores
if self._model_build_cost > self._proc_pool:
case.set_value("GMAKE_J", self._proc_pool)
self._model_build_cost = self._proc_pool
return True, ""
###########################################################################
def _setup_phase(self, test):
###########################################################################
test_dir = self._get_test_dir(test)
rv = self._shell_cmd_for_phase(test, "./case.setup", SETUP_PHASE, from_dir=test_dir)
# It's OK for this command to fail with baseline diffs but not catastrophically
if rv[0]:
cmdstat, output, _ = run_cmd("./case.cmpgen_namelists", combine_output=True, from_dir=test_dir)
expect(cmdstat in [0, TESTS_FAILED_ERR_CODE], "Fatal error in case.cmpgen_namelists: {}".format(output))
return rv
###########################################################################
def _sharedlib_build_phase(self, test):
###########################################################################
test_dir = self._get_test_dir(test)
return self._shell_cmd_for_phase(test, "./case.build --sharedlib-only", SHAREDLIB_BUILD_PHASE, from_dir=test_dir)
###########################################################################
def _model_build_phase(self, test):
###########################################################################
test_dir = self._get_test_dir(test)
return self._shell_cmd_for_phase(test, "./case.build --model-only", MODEL_BUILD_PHASE, from_dir=test_dir)
###########################################################################
def _run_phase(self, test):
###########################################################################
test_dir = self._get_test_dir(test)
cmd = "./case.submit --skip-preview-namelist"
if self._no_batch:
cmd += " --no-batch"
if self._mail_user:
cmd += " --mail-user={}".format(self._mail_user)
if self._mail_type:
cmd += " -M={}".format(",".join(self._mail_type))
return self._shell_cmd_for_phase(test, cmd, RUN_PHASE, from_dir=test_dir)
###########################################################################
def _run_catch_exceptions(self, test, phase, run):
###########################################################################
try:
return run(test)
except (SystemExit, Exception) as e:
exc_tb = sys.exc_info()[2]
errput = "Test '{}' failed in phase '{}' with exception '{}'\n".format(test, phase, str(e))
errput += ''.join(traceback.format_tb(exc_tb))
self._log_output(test, errput)
return False, errput
###########################################################################
def _get_procs_needed(self, test, phase, threads_in_flight=None, no_batch=False):
###########################################################################
if phase == RUN_PHASE and (self._no_batch or no_batch):
test_dir = self._get_test_dir(test)
total_pes = int(run_cmd_no_fail("./xmlquery TOTALPES --value", from_dir=test_dir))
return total_pes
elif (phase == SHAREDLIB_BUILD_PHASE):
if self._cime_model == "cesm":
# Will force serialization of sharedlib builds
# TODO - instead of serializing, compute all library configs needed and build
# them all in parallel
for _, _, running_phase in threads_in_flight.values():
if (running_phase == SHAREDLIB_BUILD_PHASE):
return self._proc_pool + 1
return 1
elif (phase == MODEL_BUILD_PHASE):
# Model builds now happen in parallel
return self._model_build_cost
else:
return 1
###########################################################################
def _wait_for_something_to_finish(self, threads_in_flight):
###########################################################################
expect(len(threads_in_flight) <= self._parallel_jobs, "Oversubscribed?")
finished_tests = []
while not finished_tests:
for test, thread_info in threads_in_flight.items():
if not thread_info[0].is_alive():
finished_tests.append((test, thread_info[1]))
if not finished_tests:
time.sleep(0.2)
for finished_test, procs_needed in finished_tests:
self._procs_avail += procs_needed
del threads_in_flight[finished_test]
###########################################################################
def _update_test_status_file(self, test, test_phase, status):
###########################################################################
"""
In general, test_scheduler should not be responsible for updating
the TestStatus file, but there are a few cases where it has to.
"""
test_dir = self._get_test_dir(test)
with TestStatus(test_dir=test_dir, test_name=test) as ts:
ts.set_status(test_phase, status)
###########################################################################
def _consumer(self, test, test_phase, phase_method):
###########################################################################
before_time = time.time()
success, errors = self._run_catch_exceptions(test, test_phase, phase_method)
elapsed_time = time.time() - before_time
status = (TEST_PEND_STATUS if test_phase == RUN_PHASE and not \
self._no_batch else TEST_PASS_STATUS) if success else TEST_FAIL_STATUS
if status != TEST_PEND_STATUS:
self._update_test_status(test, test_phase, status)
if not self._work_remains(test):
self._completed_tests += 1
total = len(self._tests)
status_str = "Finished {} for test {} in {:f} seconds ({}). [COMPLETED {:d} of {:d}]".format(test_phase, test, elapsed_time, status, self._completed_tests, total)
else:
status_str = "Finished {} for test {} in {:f} seconds ({})".format(test_phase, test, elapsed_time, status)
if not success:
status_str += "\n Case dir: {}\n".format(self._get_test_dir(test))
status_str += " Errors were:\n {}\n".format("\n ".join(str(errors.encode('utf-8')).splitlines()))
logger.info(status_str)
if test_phase in [CREATE_NEWCASE_PHASE, XML_PHASE]:
# These are the phases for which TestScheduler is reponsible for
# updating the TestStatus file
self._update_test_status_file(test, test_phase, status)
if test_phase == XML_PHASE:
append_status("Case Created using: "+" ".join(sys.argv), "README.case", caseroot=self._get_test_dir(test))
# On batch systems, we want to immediately submit to the queue, because
# it's very cheap to submit and will get us a better spot in line
if (success and not self._no_run and not self._no_batch and test_phase == MODEL_BUILD_PHASE):
logger.info("Starting {} for test {} with 1 proc on interactive node and {:d} procs on compute nodes".format(RUN_PHASE, test, self._get_procs_needed(test, RUN_PHASE, no_batch=True)))
self._update_test_status(test, RUN_PHASE, TEST_PEND_STATUS)
self._consumer(test, RUN_PHASE, self._run_phase)
###########################################################################
def _producer(self):
###########################################################################
threads_in_flight = {} # test-name -> (thread, procs, phase)
while True:
work_to_do = False
num_threads_launched_this_iteration = 0
for test in self._tests:
logger.debug("test_name: " + test)
if self._work_remains(test):
work_to_do = True
# If we have no workers available, immediately break out of loop so we can wait
if len(threads_in_flight) == self._parallel_jobs:
break
if test not in threads_in_flight:
test_phase, test_status = self._get_test_data(test)
expect(test_status != TEST_PEND_STATUS, test)
next_phase = self._phases[self._phases.index(test_phase) + 1]
procs_needed = self._get_procs_needed(test, next_phase, threads_in_flight)
if procs_needed <= self._procs_avail:
self._procs_avail -= procs_needed
# Necessary to print this way when multiple threads printing
logger.info("Starting {} for test {} with {:d} procs".format(next_phase, test, procs_needed))
self._update_test_status(test, next_phase, TEST_PEND_STATUS)
new_thread = threading.Thread(target=self._consumer,
args=(test, next_phase, getattr(self, "_{}_phase".format(next_phase.lower())) ))
threads_in_flight[test] = (new_thread, procs_needed, next_phase)
new_thread.start()
num_threads_launched_this_iteration += 1
logger.debug(" Current workload:")
total_procs = 0
for the_test, the_data in six.iteritems(threads_in_flight):
logger.debug(" {}: {} -> {}".format(the_test, the_data[2], the_data[1]))
total_procs += the_data[1]
logger.debug(" Total procs in use: {}".format(total_procs))
else:
if not threads_in_flight:
msg = "Phase '{}' for test '{}' required more processors, {:d}, than this machine can provide, {:d}".format(next_phase, test, procs_needed, self._procs_avail)
logger.warning(msg)
self._update_test_status(test, next_phase, TEST_PEND_STATUS)
self._update_test_status(test, next_phase, TEST_FAIL_STATUS)
self._log_output(test, msg)
if next_phase == RUN_PHASE:
self._update_test_status_file(test, SUBMIT_PHASE, TEST_PASS_STATUS)
self._update_test_status_file(test, next_phase, TEST_FAIL_STATUS)
else:
self._update_test_status_file(test, next_phase, TEST_FAIL_STATUS)
num_threads_launched_this_iteration += 1
if not work_to_do:
break
if num_threads_launched_this_iteration == 0:
# No free resources, wait for something in flight to finish
self._wait_for_something_to_finish(threads_in_flight)
for unfinished_thread, _, _ in threads_in_flight.values():
unfinished_thread.join()
###########################################################################
def _setup_cs_files(self):
###########################################################################
try:
python_libs_root = CIME.utils.get_python_libs_root()
template_file = os.path.join(python_libs_root, "cs.status.template")
template = open(template_file, "r").read()
template = template.replace("<PATH>",
os.path.join(self._cime_root,"scripts","Tools")).replace\
("<TESTID>", self._test_id).replace\
("<TESTROOT>", self._test_root)
if not os.path.exists(self._test_root):
os.makedirs(self._test_root)
cs_status_file = os.path.join(self._test_root, "cs.status.{}".format(self._test_id))
with open(cs_status_file, "w") as fd:
fd.write(template)
os.chmod(cs_status_file, os.stat(cs_status_file).st_mode | stat.S_IXUSR | stat.S_IXGRP)
template_file = os.path.join(python_libs_root, "cs.submit.template")
template = open(template_file, "r").read()
setup_cmd = "./case.setup" if self._no_setup else ":"
build_cmd = "./case.build" if self._no_build else ":"
test_cmd = "./case.submit"
template = template.replace("<SETUP_CMD>", setup_cmd).\
replace("<BUILD_CMD>", build_cmd).\
replace("<RUN_CMD>", test_cmd).\
replace("<TESTID>", self._test_id)
if self._no_run:
cs_submit_file = os.path.join(self._test_root, "cs.submit.{}".format(self._test_id))
with open(cs_submit_file, "w") as fd:
fd.write(template)
os.chmod(cs_submit_file,
os.stat(cs_submit_file).st_mode | stat.S_IXUSR | stat.S_IXGRP)
if self._cime_model == "cesm":
template_file = os.path.join(python_libs_root, "testreporter.template")
template = open(template_file, "r").read()
template = template.replace("<PATH>",
os.path.join(self._cime_root, "scripts", "Tools"))
testreporter_file = os.path.join(self._test_root, "testreporter")
with open(testreporter_file, "w") as fd:
fd.write(template)
os.chmod(testreporter_file, os.stat(testreporter_file).st_mode
| stat.S_IXUSR | stat.S_IXGRP)
except Exception as e:
logger.warning("FAILED to set up cs files: {}".format(str(e)))
###########################################################################
[docs] def run_tests(self, wait=False,
wait_check_throughput=False,
wait_check_memory=False,
wait_ignore_namelists=False,
wait_ignore_memleak=False):
###########################################################################
"""
Main API for this class.
Return True if all tests passed.
"""
start_time = time.time()
# Tell user what will be run
logger.info( "RUNNING TESTS:")
for test in self._tests:
logger.info( " {}".format(test))
# Setup cs files
self._setup_cs_files()
GenericXML.DISABLE_CACHING = True
self._producer()
GenericXML.DISABLE_CACHING = False
expect(threading.active_count() == 1, "Leftover threads?")
wait_handles_report = False
if not self._no_run and not self._no_batch:
if wait:
logger.info("Waiting for tests to finish")
rv = wait_for_tests(glob.glob(os.path.join(self._test_root, "*{}/TestStatus".format(self._test_id))),
check_throughput=wait_check_throughput,
check_memory=wait_check_memory,
ignore_namelists=wait_ignore_namelists,
ignore_memleak=wait_ignore_memleak)
wait_handles_report = True
else:
logger.info("Due to presence of batch system, create_test will exit before tests are complete.\n" \
"To force create_test to wait for full completion, use --wait")
# Return True if all tests passed from our point of view
if not wait_handles_report:
logger.info( "At test-scheduler close, state is:")
rv = True
for test in self._tests:
phase, status = self._get_test_data(test)
# Give highest priority to fails in test schduler
if status not in [TEST_PASS_STATUS, TEST_PEND_STATUS]:
logger.info( "{} {} (phase {})".format(status, test, phase))
rv = False
else:
# Be cautious about telling the user that the test passed. This
# status should match what they would see on the dashboard. Our
# self._test_states does not include comparison fail information,
# so we need to parse test status.
ts = TestStatus(self._get_test_dir(test))
nlfail = ts.get_status(NAMELIST_PHASE) == TEST_FAIL_STATUS
ts_status = ts.get_overall_test_status(ignore_namelists=True, check_memory=False, check_throughput=False)
if ts_status not in [TEST_PASS_STATUS, TEST_PEND_STATUS]:
logger.info( "{} {} (phase {})".format(ts_status, test, phase))
rv = False
elif nlfail:
logger.info( "{} {} (but otherwise OK) {}".format(NAMELIST_FAIL_STATUS, test, phase))
rv = False
else:
logger.info("{} {} {}".format(status, test, phase))
logger.info( " Case dir: {}".format(self._get_test_dir(test)))
logger.info( "test-scheduler took {} seconds".format(time.time() - start_time))
return rv