Source code for CIME.tests.base

#!/usr/bin/env python3

import glob
import os
import tempfile
import time
import signal
import shutil
import stat
import sys
import unittest

from CIME import utils
from CIME.config import Config
from CIME.XML.machines import Machines

[docs] def typed_os_environ(key, default_value, expected_type=None): # Infer type if not explicitly set dst_type = expected_type or type(default_value) value = os.environ.get(key, default_value) if value is not None and dst_type == bool: # Any else is false, might want to be more strict return value.lower() == "true" if isinstance(value, str) else value if value is None: return None return dst_type(value)
[docs] class BaseTestCase(unittest.TestCase): # These static values are set when scripts/lib/CIME/tests/ is called. MACHINE = None SCRIPT_DIR = utils.get_scripts_root() TOOLS_DIR = os.path.join(utils.get_cime_root(), "CIME", "Tools") TEST_ROOT = None TEST_COMPILER = None TEST_MPILIB = None NO_FORTRAN_RUN = None FAST_ONLY = None NO_BATCH = None NO_CMAKE = None NO_TEARDOWN = None GLOBAL_TIMEOUT = None
[docs] def setUp(self): self._thread_error = None self._unset_proxy = self.setup_proxy() self._machine = self.MACHINE.get_machine_name() self._compiler = ( self.MACHINE.get_default_compiler() if self.TEST_COMPILER is None else self.TEST_COMPILER ) self._baseline_name = "fake_testing_only_%s" % utils.get_timestamp() self._baseline_area = os.path.join(self.TEST_ROOT, "baselines") self._testroot = self.TEST_ROOT self._hasbatch = self.MACHINE.has_batch_system() and not self.NO_BATCH self._do_teardown = not self.NO_TEARDOWN self._root_dir = os.getcwd() self._cprnc = self.MACHINE.get_value("CCSM_CPRNC") customize_path = os.path.join(utils.get_src_root(), "cime_config", "customize") self._config = Config.load(customize_path) self._driver = utils.get_cime_default_driver()
[docs] def tearDown(self): self.kill_subprocesses() os.chdir(self._root_dir) if self._unset_proxy: del os.environ["http_proxy"] files_to_clean = [] baselines = os.path.join(self._baseline_area, self._baseline_name) if os.path.isdir(baselines): files_to_clean.append(baselines) for test_id in ["master", self._baseline_name]: for leftover in glob.glob(os.path.join(self._testroot, "*%s*" % test_id)): files_to_clean.append(leftover) do_teardown = self._do_teardown and sys.exc_info() == (None, None, None) if not do_teardown and files_to_clean: print("Detected failed test or user request no teardown") print("Leaving files:") for file_to_clean in files_to_clean: print(" " + file_to_clean) else: # For batch machines need to avoid race condition as batch system # finishes I/O for the case. if self._hasbatch: time.sleep(5) for file_to_clean in files_to_clean: if os.path.isdir(file_to_clean): shutil.rmtree(file_to_clean) else: os.remove(file_to_clean)
[docs] def assert_test_status(self, test_name, test_status_obj, test_phase, expected_stat): test_status = test_status_obj.get_status(test_phase) self.assertEqual( test_status, expected_stat, msg="Problem with {}: for phase '{}': has status '{}', expected '{}'".format( test_name, test_phase, test_status, expected_stat ), )
[docs] def run_cmd_assert_result( self, cmd, from_dir=None, expected_stat=0, env=None, verbose=False, shell=True ): from_dir = os.getcwd() if from_dir is None else from_dir stat, output, errput = utils.run_cmd( cmd, from_dir=from_dir, env=env, verbose=verbose, shell=shell ) if expected_stat == 0: expectation = "SHOULD HAVE WORKED, INSTEAD GOT STAT %s" % stat else: expectation = "EXPECTED STAT %s, INSTEAD GOT STAT %s" % ( expected_stat, stat, ) msg = """ COMMAND: %s FROM_DIR: %s %s OUTPUT: %s ERRPUT: %s """ % ( cmd, from_dir, expectation, output, errput, ) self.assertEqual(stat, expected_stat, msg=msg) return output
[docs] def setup_proxy(self): if "http_proxy" not in os.environ: proxy = self.MACHINE.get_value("PROXY") if proxy is not None: os.environ["http_proxy"] = proxy return True return False
[docs] def assert_dashboard_has_build(self, build_name, expected_count=1): # Do not test E3SM dashboard if model is CESM if self._config.test_mode == "e3sm": time.sleep(10) # Give chance for cdash to update wget_file = tempfile.mktemp() utils.run_cmd_no_fail( "wget --no-check-certificate -O %s" % wget_file ) raw_text = open(wget_file, "r").read() os.remove(wget_file) num_found = raw_text.count(build_name) self.assertEqual( num_found, expected_count, msg="Dashboard did not have expected num occurances of build name '%s'. Expected %s, found %s" % (build_name, expected_count, num_found), )
[docs] def kill_subprocesses( self, name=None, sig=signal.SIGKILL, expected_num_killed=None ): # Kill all subprocesses proc_ids = utils.find_proc_id(proc_name=name, children_only=True) if expected_num_killed is not None: self.assertEqual( len(proc_ids), expected_num_killed, msg="Expected to find %d processes to kill, found %d" % (expected_num_killed, len(proc_ids)), ) for proc_id in proc_ids: try: os.kill(proc_id, sig) except OSError: pass
[docs] def kill_python_subprocesses(self, sig=signal.SIGKILL, expected_num_killed=None): self.kill_subprocesses("[Pp]ython", sig, expected_num_killed)
def _create_test( self, extra_args, test_id=None, run_errors=False, env_changes="", default_baseline_area=False, ): """ Convenience wrapper around create_test. Returns list of full paths to created cases. If multiple cases, the order of the returned list is not guaranteed to match the order of the arguments. """ # All stub model not supported in nuopc driver if self._driver == "nuopc" and "cime_developer" in extra_args: extra_args.append( " ^SMS_Ln3.T42_T42.S ^PRE.f19_f19.ADESP_TEST ^PRE.f19_f19.ADESP ^DAE.ww3a.ADWAV ^IRT_N2_Vmct_Ln9.f19_g16_rx1.A" ) test_id = ( "{}-{}".format(self._baseline_name, utils.get_timestamp()) if test_id is None else test_id ) extra_args.append("-t {}".format(test_id)) if not default_baseline_area: extra_args.append("--baseline-root {}".format(self._baseline_area)) if self.NO_BATCH: extra_args.append("--no-batch") if self.TEST_COMPILER and ( [extra_arg for extra_arg in extra_args if "--compiler" in extra_arg] == [] ): extra_args.append("--compiler={}".format(self.TEST_COMPILER)) if self.TEST_MPILIB and ( [extra_arg for extra_arg in extra_args if "--mpilib" in extra_arg] == [] ): extra_args.append("--mpilib={}".format(self.TEST_MPILIB)) if [extra_arg for extra_arg in extra_args if "--machine" in extra_arg] == []: extra_args.append(f"--machine {self.MACHINE.get_machine_name()}") extra_args.append("--test-root={0} --output-root={0}".format(self._testroot)) full_run = ( set(extra_args) & set(["-n", "--namelist-only", "--no-setup", "--no-build", "--no-run"]) ) == set() if full_run and not self.NO_BATCH: extra_args.append("--wait") expected_stat = 0 if not run_errors else utils.TESTS_FAILED_ERR_CODE output = self.run_cmd_assert_result( "{} {}/create_test {}".format( env_changes, self.SCRIPT_DIR, " ".join(extra_args) ), expected_stat=expected_stat, ) cases = [] for line in output.splitlines(): if "Case dir:" in line: casedir = line.split()[-1] self.assertTrue( os.path.isdir(casedir), msg="Missing casedir {}".format(casedir) ) cases.append(casedir) self.assertTrue(len(cases) > 0, "create_test made no cases") return cases[0] if len(cases) == 1 else cases def _wait_for_tests(self, test_id, expect_works=True, always_wait=False): if self._hasbatch or always_wait: timeout_arg = ( "--timeout={}".format(self.GLOBAL_TIMEOUT) if self.GLOBAL_TIMEOUT is not None else "" ) expected_stat = 0 if expect_works else utils.TESTS_FAILED_ERR_CODE self.run_cmd_assert_result( "{}/wait_for_tests {} *{}/TestStatus".format( self.TOOLS_DIR, timeout_arg, test_id ), from_dir=self._testroot, expected_stat=expected_stat, )
[docs] def get_casedir(self, case_fragment, all_cases): potential_matches = [item for item in all_cases if case_fragment in item] self.assertTrue( len(potential_matches) == 1, "Ambiguous casedir selection for {}, found {} among {}".format( case_fragment, potential_matches, all_cases ), ) return potential_matches[0]
[docs] def verify_perms(self, root_dir): for root, dirs, files in os.walk(root_dir): for filename in files: full_path = os.path.join(root, filename) st = os.stat(full_path) self.assertTrue( st.st_mode & stat.S_IWGRP, msg="file {} is not group writeable".format(full_path), ) self.assertTrue( st.st_mode & stat.S_IRGRP, msg="file {} is not group readable".format(full_path), ) self.assertTrue( st.st_mode & stat.S_IROTH, msg="file {} is not world readable".format(full_path), ) for dirname in dirs: full_path = os.path.join(root, dirname) st = os.stat(full_path) self.assertTrue( st.st_mode & stat.S_IWGRP, msg="dir {} is not group writable".format(full_path), ) self.assertTrue( st.st_mode & stat.S_IRGRP, msg="dir {} is not group readable".format(full_path), ) self.assertTrue( st.st_mode & stat.S_IXGRP, msg="dir {} is not group executable".format(full_path), ) self.assertTrue( st.st_mode & stat.S_IROTH, msg="dir {} is not world readable".format(full_path), ) self.assertTrue( st.st_mode & stat.S_IXOTH, msg="dir {} is not world executable".format(full_path), )