"""
Contains the crucial TestStatus class which manages phase-state of a test
case and ensure that this state is represented by the TestStatus file in
the case.
TestStatus objects are only modifiable via the set_status method and this
is only allowed if the object is being accessed within the context of a
context manager. Example:
with TestStatus(test_dir=caseroot) as ts:
ts.set_status(RUN_PHASE, TEST_PASS_STATUS)
This file also contains all of the hardcoded phase information which includes
the phase names, phase orders, potential phase states, and which phases are
required (core phases).
Additional important design decisions:
1) In order to ensure that incomplete tests are always left in a PEND
state, updating a core phase to a PASS state will automatically set the next
core state to PEND.
2) If the user repeats a core state, that invalidates all subsequent state. For
example, if a user rebuilds their case, then any of the post-run states like the
RUN state are no longer valid.
"""
from CIME.XML.standard_module_setup import *
from collections import OrderedDict
import os
TEST_STATUS_FILENAME = "TestStatus"
# The statuses that a phase can be in
TEST_PEND_STATUS = "PEND"
TEST_PASS_STATUS = "PASS"
TEST_FAIL_STATUS = "FAIL"
ALL_PHASE_STATUSES = [TEST_PEND_STATUS, TEST_PASS_STATUS, TEST_FAIL_STATUS]
# Special statuses that the overall test can be in
TEST_DIFF_STATUS = "DIFF" # Implies a failure in the BASELINE phase
NAMELIST_FAIL_STATUS = "NLFAIL" # Implies a failure in the NLCOMP phase
# Special strings that can appear in comments, indicating particular types of failures
TEST_NO_BASELINES_COMMENT = "BFAIL" # Implies baseline directory is missing in the baseline comparison phase
# The valid phases
CREATE_NEWCASE_PHASE = "CREATE_NEWCASE"
XML_PHASE = "XML"
SETUP_PHASE = "SETUP"
NAMELIST_PHASE = "NLCOMP"
SHAREDLIB_BUILD_PHASE = "SHAREDLIB_BUILD"
MODEL_BUILD_PHASE = "MODEL_BUILD"
SUBMIT_PHASE = "SUBMIT"
RUN_PHASE = "RUN"
THROUGHPUT_PHASE = "TPUTCOMP"
MEMCOMP_PHASE = "MEMCOMP"
MEMLEAK_PHASE = "MEMLEAK"
STARCHIVE_PHASE = "SHORT_TERM_ARCHIVER"
COMPARE_PHASE = "COMPARE" # This is one special, real phase will be COMPARE_$WHAT, this is for internal test comparisons, there could be multiple variations of this phase in one test
BASELINE_PHASE = "BASELINE"
GENERATE_PHASE = "GENERATE"
ALL_PHASES = [CREATE_NEWCASE_PHASE,
XML_PHASE,
SETUP_PHASE,
NAMELIST_PHASE,
SHAREDLIB_BUILD_PHASE,
MODEL_BUILD_PHASE,
SUBMIT_PHASE,
RUN_PHASE,
COMPARE_PHASE,
BASELINE_PHASE,
THROUGHPUT_PHASE,
MEMCOMP_PHASE,
MEMLEAK_PHASE,
STARCHIVE_PHASE,
GENERATE_PHASE]
# These are mandatory phases that a test must go through
CORE_PHASES = [CREATE_NEWCASE_PHASE,
XML_PHASE,
SETUP_PHASE,
SHAREDLIB_BUILD_PHASE,
MODEL_BUILD_PHASE,
SUBMIT_PHASE,
RUN_PHASE]
def _test_helper1(file_contents):
ts = TestStatus(test_dir="/", test_name="ERS.foo.A")
ts._parse_test_status(file_contents) # pylint: disable=protected-access
return ts._phase_statuses # pylint: disable=protected-access
def _test_helper2(file_contents, wait_for_run=False, check_throughput=False, check_memory=False, ignore_namelists=False):
ts = TestStatus(test_dir="/", test_name="ERS.foo.A")
ts._parse_test_status(file_contents) # pylint: disable=protected-access
return ts.get_overall_test_status(wait_for_run=wait_for_run,
check_throughput=check_throughput,
check_memory=check_memory,
ignore_namelists=ignore_namelists)
[docs]class TestStatus(object):
def __init__(self, test_dir=None, test_name=None, no_io=False):
"""
Create a TestStatus object
If test_dir is not specified, it is set to the current working directory
no_io is intended only for testing, and should be kept False in
production code
"""
test_dir = os.getcwd() if test_dir is None else test_dir
self._filename = os.path.join(test_dir, TEST_STATUS_FILENAME)
self._phase_statuses = OrderedDict() # {name -> (status, comments)}
self._test_name = test_name
self._ok_to_modify = False
self._no_io = no_io
if os.path.exists(self._filename):
self._parse_test_status_file()
if not os.access(self._filename, os.W_OK):
self._no_io = True
else:
expect(test_name is not None, "Must provide test_name if TestStatus file doesn't exist")
def __enter__(self):
self._ok_to_modify = True
return self
def __exit__(self, *_):
self._ok_to_modify = False
self.flush()
def __iter__(self):
for phase, data in self._phase_statuses.items():
yield phase, data[0]
[docs] def get_name(self):
return self._test_name
[docs] def set_status(self, phase, status, comments=""):
"""
Update the status of this test by changing the status of given phase to the
given status.
>>> with TestStatus(test_dir="/", test_name="ERS.foo.A", no_io=True) as ts:
... ts.set_status(CREATE_NEWCASE_PHASE, "PASS")
... ts.set_status(XML_PHASE, "PASS")
... ts.set_status(SETUP_PHASE, "FAIL")
... ts.set_status(SETUP_PHASE, "PASS")
... ts.set_status("{}_base_rest".format(COMPARE_PHASE), "FAIL")
... ts.set_status(SHAREDLIB_BUILD_PHASE, "PASS", comments='Time=42')
>>> ts._phase_statuses
OrderedDict([('CREATE_NEWCASE', ('PASS', '')), ('XML', ('PASS', '')), ('SETUP', ('PASS', '')), ('SHAREDLIB_BUILD', ('PASS', 'Time=42')), ('COMPARE_base_rest', ('FAIL', '')), ('MODEL_BUILD', ('PEND', ''))])
>>> with TestStatus(test_dir="/", test_name="ERS.foo.A", no_io=True) as ts:
... ts.set_status(CREATE_NEWCASE_PHASE, "PASS")
... ts.set_status(XML_PHASE, "PASS")
... ts.set_status(SETUP_PHASE, "FAIL")
... ts.set_status(SETUP_PHASE, "PASS")
... ts.set_status(BASELINE_PHASE, "PASS")
... ts.set_status("{}_base_rest".format(COMPARE_PHASE), "FAIL")
... ts.set_status(SHAREDLIB_BUILD_PHASE, "PASS", comments='Time=42')
... ts.set_status(SETUP_PHASE, "PASS")
>>> ts._phase_statuses
OrderedDict([('CREATE_NEWCASE', ('PASS', '')), ('XML', ('PASS', '')), ('SETUP', ('PASS', '')), ('SHAREDLIB_BUILD', ('PEND', ''))])
>>> with TestStatus(test_dir="/", test_name="ERS.foo.A", no_io=True) as ts:
... ts.set_status(CREATE_NEWCASE_PHASE, "FAIL")
>>> ts._phase_statuses
OrderedDict([('CREATE_NEWCASE', ('FAIL', ''))])
"""
expect(self._ok_to_modify, "TestStatus not in a modifiable state, use 'with' syntax")
expect(phase in ALL_PHASES or phase.startswith(COMPARE_PHASE),
"Invalid phase '{}'".format(phase))
expect(status in ALL_PHASE_STATUSES, "Invalid status '{}'".format(status))
if phase in CORE_PHASES and phase != CORE_PHASES[0]:
previous_core_phase = CORE_PHASES[CORE_PHASES.index(phase)-1]
#TODO: enable check below
#expect(previous_core_phase in self._phase_statuses, "Core phase '{}' was skipped".format(previous_core_phase))
if previous_core_phase in self._phase_statuses:
expect(self._phase_statuses[previous_core_phase][0] == TEST_PASS_STATUS,
"Cannot move past core phase '{}', it didn't pass: ".format(previous_core_phase))
reran_phase = (phase in self._phase_statuses and self._phase_statuses[phase][0] != TEST_PEND_STATUS and phase in CORE_PHASES)
if reran_phase:
# All subsequent phases are invalidated
phase_idx = ALL_PHASES.index(phase)
for subsequent_phase in ALL_PHASES[phase_idx+1:]:
if subsequent_phase in self._phase_statuses:
del self._phase_statuses[subsequent_phase]
if subsequent_phase.startswith(COMPARE_PHASE):
for stored_phase in list(self._phase_statuses.keys()):
if stored_phase.startswith(COMPARE_PHASE):
del self._phase_statuses[stored_phase]
self._phase_statuses[phase] = (status, comments) # Can overwrite old phase info
if status == TEST_PASS_STATUS and phase in CORE_PHASES and phase != CORE_PHASES[-1]:
next_core_phase = CORE_PHASES[CORE_PHASES.index(phase)+1]
self._phase_statuses[next_core_phase] = (TEST_PEND_STATUS, "")
[docs] def get_status(self, phase):
return self._phase_statuses[phase][0] if phase in self._phase_statuses else None
[docs] def phase_statuses_dump(self, prefix=''):
"""
Args:
prefix: string printed at the start of each line
"""
result = ""
if self._phase_statuses:
for phase, data in self._phase_statuses.items():
status, comments = data
if not comments:
result += "{}{} {} {}\n".format(prefix, status, self._test_name, phase)
else:
result += "{}{} {} {} {}\n".format(prefix, status, self._test_name, phase, comments)
return result
[docs] def flush(self):
if self._phase_statuses and not self._no_io:
with open(self._filename, "w") as fd:
fd.write(self.phase_statuses_dump())
def _parse_test_status(self, file_contents):
"""
>>> contents = '''
... PASS ERS.foo.A CREATE_NEWCASE
... PASS ERS.foo.A XML
... FAIL ERS.foo.A SETUP
... PASS ERS.foo.A COMPARE_base_rest
... PASS ERS.foo.A SHAREDLIB_BUILD Time=42
... '''
>>> _test_helper1(contents)
OrderedDict([('CREATE_NEWCASE', ('PASS', '')), ('XML', ('PASS', '')), ('SETUP', ('FAIL', '')), ('COMPARE_base_rest', ('PASS', '')), ('SHAREDLIB_BUILD', ('PASS', 'Time=42'))])
"""
for line in file_contents.splitlines():
line = line.strip()
tokens = line.split()
if line == "":
pass # skip blank lines
elif len(tokens) >= 3:
status, curr_test_name, phase = tokens[:3]
if (self._test_name is None):
self._test_name = curr_test_name
else:
expect(self._test_name == curr_test_name,
"inconsistent test name in parse_test_status: '{}' != '{}'".format(self._test_name, curr_test_name))
expect(status in ALL_PHASE_STATUSES,
"Unexpected status '{}' in parse_test_status for test '{}'".format(status, self._test_name))
expect(phase in ALL_PHASES or phase.startswith(COMPARE_PHASE),
"phase '{}' not expected in parse_test_status for test '{}'".format(phase, self._test_name))
expect(phase not in self._phase_statuses,
"Should not have seen multiple instances of phase '{}' for test '{}'".format(phase, self._test_name))
self._phase_statuses[phase] = (status, " ".join(tokens[3:]))
else:
logging.warning("In TestStatus file for test '{}', line '{}' not in expected format".format(self._test_name, line))
def _parse_test_status_file(self):
with open(self._filename, "r") as fd:
self._parse_test_status(fd.read())
[docs] def get_overall_test_status(self, wait_for_run=False, check_throughput=False, check_memory=False, ignore_namelists=False, ignore_memleak=False):
r"""
Given the current phases and statuses, produce a single results for this test. Preference
is given to PEND since we don't want to stop waiting for a test
that hasn't finished. Namelist diffs are given the lowest precedence.
>>> _test_helper2('PASS ERS.foo.A RUN')
'PASS'
>>> _test_helper2('PASS ERS.foo.A SHAREDLIB_BUILD\nPEND ERS.foo.A RUN')
'PEND'
>>> _test_helper2('FAIL ERS.foo.A MODEL_BUILD\nPEND ERS.foo.A RUN')
'PEND'
>>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPASS ERS.foo.A RUN')
'PASS'
>>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A TPUTCOMP')
'PASS'
>>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A TPUTCOMP', check_throughput=True)
'FAIL'
>>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A NLCOMP')
'NLFAIL'
>>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A MEMCOMP')
'PASS'
>>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A NLCOMP', ignore_namelists=True)
'PASS'
>>> _test_helper2('PASS ERS.foo.A COMPARE_1\nFAIL ERS.foo.A NLCOMP\nFAIL ERS.foo.A COMPARE_2\nPASS ERS.foo.A RUN')
'FAIL'
>>> _test_helper2('FAIL ERS.foo.A BASELINE\nFAIL ERS.foo.A NLCOMP\nPASS ERS.foo.A COMPARE_2\nPASS ERS.foo.A RUN')
'DIFF'
>>> _test_helper2('FAIL ERS.foo.A BASELINE\nFAIL ERS.foo.A NLCOMP\nFAIL ERS.foo.A COMPARE_2\nPASS ERS.foo.A RUN')
'FAIL'
>>> _test_helper2('PASS ERS.foo.A MODEL_BUILD')
'PASS'
>>> _test_helper2('PASS ERS.foo.A MODEL_BUILD', wait_for_run=True)
'PEND'
>>> _test_helper2('FAIL ERS.foo.A MODEL_BUILD', wait_for_run=True)
'FAIL'
>>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPEND ERS.foo.A RUN', wait_for_run=True)
'PEND'
>>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nFAIL ERS.foo.A RUN', wait_for_run=True)
'FAIL'
>>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPASS ERS.foo.A RUN', wait_for_run=True)
'PASS'
"""
rv = TEST_PASS_STATUS
run_phase_found = False
for phase, data in self._phase_statuses.items():
status = data[0]
if phase == RUN_PHASE:
run_phase_found = True
if (status == TEST_PEND_STATUS):
return status
elif (status == TEST_FAIL_STATUS):
if ( (not check_throughput and phase == THROUGHPUT_PHASE) or
(not check_memory and phase == MEMCOMP_PHASE) or
(ignore_namelists and phase == NAMELIST_PHASE) or
(ignore_memleak and phase == MEMLEAK_PHASE) ):
continue
if (phase == NAMELIST_PHASE):
if (rv == TEST_PASS_STATUS):
rv = NAMELIST_FAIL_STATUS
elif (rv in [NAMELIST_FAIL_STATUS, TEST_PASS_STATUS] and phase == BASELINE_PHASE):
rv = TEST_DIFF_STATUS
else:
rv = TEST_FAIL_STATUS
# The test did not fail but the RUN phase was not found, so if the user requested
# that we wait for the RUN phase, then the test must still be considered pending.
if rv != TEST_FAIL_STATUS and not run_phase_found and wait_for_run:
rv = TEST_PEND_STATUS
return rv