Source code for CIME.test_status

"""
Contains the crucial TestStatus class which manages phase-state of a test
case and ensure that this state is represented by the TestStatus file in
the case.

TestStatus objects are only modifiable via the set_status method and this
is only allowed if the object is being accessed within the context of a
context manager. Example:

    with TestStatus(test_dir=caseroot) as ts:
        ts.set_status(RUN_PHASE, TEST_PASS_STATUS)

This file also contains all of the hardcoded phase information which includes
the phase names, phase orders, potential phase states, and which phases are
required (core phases).

Additional important design decisions:
1) In order to ensure that incomplete tests are always left in a PEND
   state, updating a core phase to a PASS state will automatically set the next
   core state to PEND.
2) If the user repeats a core state, that invalidates all subsequent state. For
   example, if a user rebuilds their case, then any of the post-run states like the
   RUN state are no longer valid.
"""

from CIME.XML.standard_module_setup import *

from collections import OrderedDict

import os, itertools
from CIME import expected_fails

TEST_STATUS_FILENAME = "TestStatus"

# The statuses that a phase can be in
TEST_PEND_STATUS = "PEND"
TEST_PASS_STATUS = "PASS"
TEST_FAIL_STATUS = "FAIL"

ALL_PHASE_STATUSES = [TEST_PEND_STATUS, TEST_PASS_STATUS, TEST_FAIL_STATUS]

# Special statuses that the overall test can be in
TEST_DIFF_STATUS = "DIFF"  # Implies a failure in the BASELINE phase
NAMELIST_FAIL_STATUS = "NLFAIL"  # Implies a failure in the NLCOMP phase

# Special strings that can appear in comments, indicating particular types of failures
TEST_NO_BASELINES_COMMENT = "BFAIL"  # Implies baseline directory is missing in the
# baseline comparison phase
TEST_RERUN_COMMENT = "RERUN"  # Added to a PEND status to indicate that the test
# system has changed this phase to PEND in order to
# rerun it (e.g., to retry a failed test).
# The expected and unexpected failure comments aren't used directly in this module, but
# are included here for symmetry, so other modules can access them from here.
TEST_EXPECTED_FAILURE_COMMENT = expected_fails.EXPECTED_FAILURE_COMMENT
TEST_UNEXPECTED_FAILURE_COMMENT_START = expected_fails.UNEXPECTED_FAILURE_COMMENT_START

# The valid phases
CREATE_NEWCASE_PHASE = "CREATE_NEWCASE"
XML_PHASE = "XML"
SETUP_PHASE = "SETUP"
NAMELIST_PHASE = "NLCOMP"
SHAREDLIB_BUILD_PHASE = "SHAREDLIB_BUILD"
MODEL_BUILD_PHASE = "MODEL_BUILD"
SUBMIT_PHASE = "SUBMIT"
RUN_PHASE = "RUN"
THROUGHPUT_PHASE = "TPUTCOMP"
MEMCOMP_PHASE = "MEMCOMP"
MEMLEAK_PHASE = "MEMLEAK"
STARCHIVE_PHASE = "SHORT_TERM_ARCHIVER"
COMPARE_PHASE = "COMPARE"  # This is one special, real phase will be COMPARE_$WHAT, this is for internal test comparisons, there could be multiple variations of this phase in one test
BASELINE_PHASE = "BASELINE"
GENERATE_PHASE = "GENERATE"

ALL_PHASES = [
    CREATE_NEWCASE_PHASE,
    XML_PHASE,
    SETUP_PHASE,
    NAMELIST_PHASE,
    SHAREDLIB_BUILD_PHASE,
    MODEL_BUILD_PHASE,
    SUBMIT_PHASE,
    RUN_PHASE,
    COMPARE_PHASE,
    BASELINE_PHASE,
    THROUGHPUT_PHASE,
    MEMCOMP_PHASE,
    MEMLEAK_PHASE,
    STARCHIVE_PHASE,
    GENERATE_PHASE,
]

# These are mandatory phases that a test must go through
CORE_PHASES = [
    CREATE_NEWCASE_PHASE,
    XML_PHASE,
    SETUP_PHASE,
    SHAREDLIB_BUILD_PHASE,
    MODEL_BUILD_PHASE,
    SUBMIT_PHASE,
    RUN_PHASE,
]


def _test_helper1(file_contents):
    ts = TestStatus(test_dir="/", test_name="ERS.foo.A")
    ts._parse_test_status(file_contents)  # pylint: disable=protected-access
    return ts._phase_statuses  # pylint: disable=protected-access


def _test_helper2(
    file_contents,
    wait_for_run=False,
    check_throughput=False,
    check_memory=False,
    ignore_namelists=False,
    no_run=False,
    no_perm=False,
):
    lines = file_contents.splitlines()
    rv = None
    perms = [lines] if no_perm else itertools.permutations(lines)
    for perm in perms:
        ts = TestStatus(test_dir="/", test_name="ERS.foo.A")
        ts._parse_test_status("\n".join(perm))  # pylint: disable=protected-access
        the_status = ts.get_overall_test_status(
            wait_for_run=wait_for_run,
            check_throughput=check_throughput,
            check_memory=check_memory,
            ignore_namelists=ignore_namelists,
            no_run=no_run,
        )
        if rv is not None and the_status != rv:
            return "{} != {}".format(rv, the_status)
        else:
            rv = the_status

    return rv


[docs] class TestStatus(object): def __init__(self, test_dir=None, test_name=None, no_io=False): """ Create a TestStatus object If test_dir is not specified, it is set to the current working directory no_io is intended only for testing, and should be kept False in production code """ test_dir = os.getcwd() if test_dir is None else test_dir self._filename = os.path.join(test_dir, TEST_STATUS_FILENAME) self._phase_statuses = OrderedDict() # {name -> (status, comments)} self._test_name = test_name self._ok_to_modify = False self._no_io = no_io if os.path.exists(self._filename): self._parse_test_status_file() if not os.access(self._filename, os.W_OK): self._no_io = True else: expect( test_name is not None, "Must provide test_name if TestStatus file doesn't exist", ) def __enter__(self): self._ok_to_modify = True return self def __exit__(self, *_): self._ok_to_modify = False self.flush() def __iter__(self): for phase, data in self._phase_statuses.items(): yield phase, data[0] def __eq__(self, rhs): return ( self._phase_statuses == rhs._phase_statuses ) # pylint: disable=protected-access def __ne__(self, rhs): return not self.__eq__(rhs)
[docs] def get_name(self): return self._test_name
[docs] def set_status(self, phase, status, comments=""): """ Update the status of this test by changing the status of given phase to the given status. >>> with TestStatus(test_dir="/", test_name="ERS.foo.A", no_io=True) as ts: ... ts.set_status(CREATE_NEWCASE_PHASE, "PASS") ... ts.set_status(XML_PHASE, "PASS") ... ts.set_status(SETUP_PHASE, "FAIL") ... ts.set_status(SETUP_PHASE, "PASS") ... ts.set_status("{}_base_rest".format(COMPARE_PHASE), "FAIL") ... ts.set_status(SHAREDLIB_BUILD_PHASE, "PASS", comments='Time=42') >>> ts._phase_statuses OrderedDict([('CREATE_NEWCASE', ('PASS', '')), ('XML', ('PASS', '')), ('SETUP', ('PASS', '')), ('SHAREDLIB_BUILD', ('PASS', 'Time=42')), ('COMPARE_base_rest', ('FAIL', '')), ('MODEL_BUILD', ('PEND', ''))]) >>> with TestStatus(test_dir="/", test_name="ERS.foo.A", no_io=True) as ts: ... ts.set_status(CREATE_NEWCASE_PHASE, "PASS") ... ts.set_status(XML_PHASE, "PASS") ... ts.set_status(SETUP_PHASE, "FAIL") ... ts.set_status(SETUP_PHASE, "PASS") ... ts.set_status(BASELINE_PHASE, "PASS") ... ts.set_status("{}_base_rest".format(COMPARE_PHASE), "FAIL") ... ts.set_status(SHAREDLIB_BUILD_PHASE, "PASS", comments='Time=42') ... ts.set_status(SETUP_PHASE, "PASS") >>> ts._phase_statuses OrderedDict([('CREATE_NEWCASE', ('PASS', '')), ('XML', ('PASS', '')), ('SETUP', ('PASS', '')), ('SHAREDLIB_BUILD', ('PEND', ''))]) >>> with TestStatus(test_dir="/", test_name="ERS.foo.A", no_io=True) as ts: ... ts.set_status(CREATE_NEWCASE_PHASE, "FAIL") >>> ts._phase_statuses OrderedDict([('CREATE_NEWCASE', ('FAIL', ''))]) """ expect( self._ok_to_modify, "TestStatus not in a modifiable state, use 'with' syntax", ) expect( phase in ALL_PHASES or phase.startswith(COMPARE_PHASE), "Invalid phase '{}'".format(phase), ) expect(status in ALL_PHASE_STATUSES, "Invalid status '{}'".format(status)) if phase in CORE_PHASES and phase != CORE_PHASES[0]: previous_core_phase = CORE_PHASES[CORE_PHASES.index(phase) - 1] # TODO: enable check below # expect(previous_core_phase in self._phase_statuses, "Core phase '{}' was skipped".format(previous_core_phase)) if previous_core_phase in self._phase_statuses: expect( self._phase_statuses[previous_core_phase][0] == TEST_PASS_STATUS, "Cannot move past core phase '{}', it didn't pass: ".format( previous_core_phase ), ) reran_phase = ( phase in self._phase_statuses and self._phase_statuses[phase][0] != TEST_PEND_STATUS and phase in CORE_PHASES ) if reran_phase: # All subsequent phases are invalidated phase_idx = ALL_PHASES.index(phase) for subsequent_phase in ALL_PHASES[phase_idx + 1 :]: if subsequent_phase in self._phase_statuses: del self._phase_statuses[subsequent_phase] if subsequent_phase.startswith(COMPARE_PHASE): for stored_phase in list(self._phase_statuses.keys()): if stored_phase.startswith(COMPARE_PHASE): del self._phase_statuses[stored_phase] self._phase_statuses[phase] = (status, comments) # Can overwrite old phase info if ( status == TEST_PASS_STATUS and phase in CORE_PHASES and phase != CORE_PHASES[-1] ): next_core_phase = CORE_PHASES[CORE_PHASES.index(phase) + 1] self._phase_statuses[next_core_phase] = (TEST_PEND_STATUS, "")
[docs] def get_status(self, phase): return self._phase_statuses[phase][0] if phase in self._phase_statuses else None
[docs] def get_comment(self, phase): return self._phase_statuses[phase][1] if phase in self._phase_statuses else None
[docs] def current_is(self, phase, status): try: latest = self.get_latest_phase() except KeyError: return False return latest == phase and self.get_status(phase) == status
[docs] def get_latest_phase(self): return list(self._phase_statuses.keys())[-1]
[docs] def phase_statuses_dump( self, prefix="", skip_passes=False, skip_phase_list=None, xfails=None ): """ Args: prefix: string printed at the start of each line skip_passes: if True, do not output lines that have a PASS status skip_phase_list: list of phases (from the phases given by ALL_PHASES) for which we skip output xfails: object of type ExpectedFails, giving expected failures for this test """ if skip_phase_list is None: skip_phase_list = [] if xfails is None: xfails = expected_fails.ExpectedFails() result = "" if self._phase_statuses: for phase, data in self._phase_statuses.items(): if phase in skip_phase_list: continue status, comments = data xfail_comment = xfails.expected_fails_comment(phase, status) if skip_passes: if status == TEST_PASS_STATUS and not xfail_comment: # Note that we still print the result of a PASSing test if there # is a comment related to the expected failure status. Typically # this will indicate that this is an unexpected PASS (and so # should be removed from the expected fails list). continue result += "{}{} {} {}".format(prefix, status, self._test_name, phase) if comments: result += " {}".format(comments) if xfail_comment: result += " {}".format(xfail_comment) result += "\n" return result
[docs] def increment_non_pass_counts(self, non_pass_counts): """ Increment counts of the number of times given phases did not pass non_pass_counts is a dictionary whose keys are phases of interest and whose values are running counts of the number of non-passes. This method increments those counts based on results in the given TestStatus object. """ for phase in non_pass_counts: if phase in self._phase_statuses: status, _ = self._phase_statuses[phase] if status != TEST_PASS_STATUS: non_pass_counts[phase] += 1
[docs] def flush(self): if self._phase_statuses and not self._no_io: with open(self._filename, "w") as fd: fd.write(self.phase_statuses_dump())
def _parse_test_status(self, file_contents): """ >>> contents = ''' ... PASS ERS.foo.A CREATE_NEWCASE ... PASS ERS.foo.A XML ... FAIL ERS.foo.A SETUP ... PASS ERS.foo.A COMPARE_base_rest ... PASS ERS.foo.A SHAREDLIB_BUILD Time=42 ... ''' >>> _test_helper1(contents) OrderedDict([('CREATE_NEWCASE', ('PASS', '')), ('XML', ('PASS', '')), ('SETUP', ('FAIL', '')), ('COMPARE_base_rest', ('PASS', '')), ('SHAREDLIB_BUILD', ('PASS', 'Time=42'))]) """ for line in file_contents.splitlines(): line = line.strip() tokens = line.split() if line == "": pass # skip blank lines elif len(tokens) >= 3: status, curr_test_name, phase = tokens[:3] if self._test_name is None: self._test_name = curr_test_name else: expect( self._test_name == curr_test_name, "inconsistent test name in parse_test_status: '{}' != '{}'".format( self._test_name, curr_test_name ), ) expect( status in ALL_PHASE_STATUSES, "Unexpected status '{}' in parse_test_status for test '{}'".format( status, self._test_name ), ) expect( phase in ALL_PHASES or phase.startswith(COMPARE_PHASE), "phase '{}' not expected in parse_test_status for test '{}'".format( phase, self._test_name ), ) expect( phase not in self._phase_statuses, "Should not have seen multiple instances of phase '{}' for test '{}'".format( phase, self._test_name ), ) self._phase_statuses[phase] = (status, " ".join(tokens[3:])) else: logging.warning( "In TestStatus file for test '{}', line '{}' not in expected format".format( self._test_name, line ) ) def _parse_test_status_file(self): with open(self._filename, "r") as fd: self._parse_test_status(fd.read()) def _get_overall_status_based_on_phases( self, phases, wait_for_run=False, check_throughput=False, check_memory=False, ignore_namelists=False, ignore_memleak=False, no_run=False, ): rv = TEST_PASS_STATUS run_phase_found = False phase_responsible_for_status = None for phase in phases: # ensure correct order of processing phases if phase in self._phase_statuses: data = self._phase_statuses[phase] else: continue status = data[0] if ( phase in CORE_PHASES and rv in [TEST_PASS_STATUS, NAMELIST_FAIL_STATUS] and status != TEST_PEND_STATUS ): phase_responsible_for_status = phase if phase == RUN_PHASE: run_phase_found = True if phase in [SUBMIT_PHASE, RUN_PHASE] and no_run: break if status == TEST_PEND_STATUS and rv in [ TEST_PASS_STATUS, NAMELIST_FAIL_STATUS, ]: if not no_run: rv = TEST_PEND_STATUS phase_responsible_for_status = phase break elif status == TEST_FAIL_STATUS: if ( (not check_throughput and phase == THROUGHPUT_PHASE) or (not check_memory and phase == MEMCOMP_PHASE) or (ignore_namelists and phase == NAMELIST_PHASE) or (ignore_memleak and phase == MEMLEAK_PHASE) ): continue if phase == NAMELIST_PHASE: if rv == TEST_PASS_STATUS: rv = NAMELIST_FAIL_STATUS elif phase in [BASELINE_PHASE, THROUGHPUT_PHASE, MEMCOMP_PHASE]: if rv in [NAMELIST_FAIL_STATUS, TEST_PASS_STATUS]: phase_responsible_for_status = phase rv = TEST_DIFF_STATUS else: pass # a DIFF does not trump a FAIL elif phase in CORE_PHASES: phase_responsible_for_status = phase return TEST_FAIL_STATUS, phase_responsible_for_status else: phase_responsible_for_status = phase rv = TEST_FAIL_STATUS # The test did not fail but the RUN phase was not found, so if the user requested # that we wait for the RUN phase, then the test must still be considered pending. if ( rv in [TEST_PASS_STATUS, NAMELIST_FAIL_STATUS] and not run_phase_found and wait_for_run ): phase_responsible_for_status = RUN_PHASE rv = TEST_PEND_STATUS return rv, phase_responsible_for_status
[docs] def get_overall_test_status( self, wait_for_run=False, check_throughput=False, check_memory=False, ignore_namelists=False, ignore_memleak=False, no_run=False, ): r""" Given the current phases and statuses, produce a single results for this test. Preference is given to PEND since we don't want to stop waiting for a test that hasn't finished. Namelist diffs are given the lowest precedence. >>> _test_helper2('PASS ERS.foo.A RUN') ('PASS', 'RUN') >>> _test_helper2('PASS ERS.foo.A SHAREDLIB_BUILD\nPEND ERS.foo.A RUN') ('PEND', 'RUN') >>> _test_helper2('FAIL ERS.foo.A MODEL_BUILD\nPEND ERS.foo.A RUN') ('FAIL', 'MODEL_BUILD') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPASS ERS.foo.A RUN') ('PASS', 'RUN') >>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A TPUTCOMP') ('PASS', 'RUN') >>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A TPUTCOMP', check_throughput=True) ('DIFF', 'TPUTCOMP') >>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A MEMCOMP', check_memory=True) ('DIFF', 'MEMCOMP') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPASS ERS.foo.A RUN\nFAIL ERS.foo.A NLCOMP') ('NLFAIL', 'RUN') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPEND ERS.foo.A RUN\nFAIL ERS.foo.A NLCOMP') ('PEND', 'RUN') >>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A MEMCOMP') ('PASS', 'RUN') >>> _test_helper2('PASS ERS.foo.A RUN\nFAIL ERS.foo.A NLCOMP', ignore_namelists=True) ('PASS', 'RUN') >>> _test_helper2('PASS ERS.foo.A COMPARE_1\nFAIL ERS.foo.A NLCOMP\nFAIL ERS.foo.A COMPARE_2\nPASS ERS.foo.A RUN') ('FAIL', 'COMPARE_2') >>> _test_helper2('FAIL ERS.foo.A BASELINE\nFAIL ERS.foo.A NLCOMP\nPASS ERS.foo.A COMPARE_2\nPASS ERS.foo.A RUN') ('DIFF', 'BASELINE') >>> _test_helper2('FAIL ERS.foo.A BASELINE\nFAIL ERS.foo.A NLCOMP\nFAIL ERS.foo.A COMPARE_2\nPASS ERS.foo.A RUN') ('FAIL', 'COMPARE_2') >>> _test_helper2('PEND ERS.foo.A COMPARE_2\nFAIL ERS.foo.A RUN') ('FAIL', 'RUN') >>> _test_helper2('PEND ERS.foo.A COMPARE_2\nPASS ERS.foo.A RUN') ('PEND', 'COMPARE_2') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD') ('PASS', 'MODEL_BUILD') >>> _test_helper2('PEND ERS.foo.A MODEL_BUILD\nPEND ERS.foo.A RUN') ('PEND', 'MODEL_BUILD') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD', wait_for_run=True) ('PEND', 'RUN') >>> _test_helper2('FAIL ERS.foo.A MODEL_BUILD', wait_for_run=True) ('FAIL', 'MODEL_BUILD') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPEND ERS.foo.A RUN', wait_for_run=True) ('PEND', 'RUN') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nFAIL ERS.foo.A RUN', wait_for_run=True) ('FAIL', 'RUN') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPASS ERS.foo.A RUN', wait_for_run=True) ('PASS', 'RUN') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nFAIL ERS.foo.A RUN\nPEND ERS.foo.A COMPARE') ('FAIL', 'RUN') >>> _test_helper2('PASS ERS.foo.A MODEL_BUILD\nPEND ERS.foo.A RUN', no_run=True) ('PASS', 'MODEL_BUILD') >>> s = '''PASS ERS.foo.A CREATE_NEWCASE ... PASS ERS.foo.A XML ... PASS ERS.foo.A SETUP ... PASS ERS.foo.A SHAREDLIB_BUILD time=454 ... PASS ERS.foo.A NLCOMP ... PASS ERS.foo.A MODEL_BUILD time=363 ... PASS ERS.foo.A SUBMIT ... PASS ERS.foo.A RUN time=73 ... PEND ERS.foo.A COMPARE_base_single_thread ... FAIL ERS.foo.A BASELINE master: DIFF ... PASS ERS.foo.A TPUTCOMP ... PASS ERS.foo.A MEMLEAK insuffiencient data for memleak test ... PASS ERS.foo.A SHORT_TERM_ARCHIVER ... ''' >>> _test_helper2(s, no_perm=True) ('PEND', 'COMPARE_base_single_thread') >>> s = '''PASS ERS.foo.A CREATE_NEWCASE ... PASS ERS.foo.A XML ... PASS ERS.foo.A SETUP ... PEND ERS.foo.A SHAREDLIB_BUILD ... FAIL ERS.foo.A NLCOMP ... ''' >>> _test_helper2(s, no_run=True) ('NLFAIL', 'SETUP') >>> _test_helper2(s, no_run=False) ('PEND', 'SHAREDLIB_BUILD') """ # Core phases take priority core_rv, phase = self._get_overall_status_based_on_phases( CORE_PHASES, wait_for_run=wait_for_run, check_throughput=check_throughput, check_memory=check_memory, ignore_namelists=ignore_namelists, ignore_memleak=ignore_memleak, no_run=no_run, ) if core_rv != TEST_PASS_STATUS: return core_rv, phase else: phase_order = list(CORE_PHASES) phase_order.extend( [item for item in self._phase_statuses if item not in CORE_PHASES] ) return self._get_overall_status_based_on_phases( phase_order, wait_for_run=wait_for_run, check_throughput=check_throughput, check_memory=check_memory, ignore_namelists=ignore_namelists, ignore_memleak=ignore_memleak, no_run=no_run, )