Source code for CIME.tests.utils

import io
import os
import tempfile
import signal
import shutil
import sys
import time
import contextlib
from collections.abc import Iterable

from CIME import utils
from CIME import test_status
from CIME.utils import expect

MACRO_PRESERVE_ENV = [
    "ADDR2LINE",
    "AR",
    "AS",
    "CC",
    "CC_FOR_BUILD",
    "CMAKE_ARGS",
    "CONDA_EXE",
    "CONDA_PYTHON_EXE",
    "CPP",
    "CXX",
    "CXXFILT",
    "CXX_FOR_BUILD",
    "ELFEDIT",
    "F77",
    "F90",
    "F95",
    "FC",
    "GCC",
    "GCC_AR",
    "GCC_NM",
    "GCC_RANLIB",
    "GFORTRAN",
    "GPROF",
    "GXX",
    "LD",
    "LD_GOLD",
    "NM",
    "OBJCOPY",
    "OBJDUMP",
    "PATH",
    "RANLIB",
    "READELF",
    "SIZE",
    "STRINGS",
    "STRIP",
]


[docs] @contextlib.contextmanager def chdir(path): old_path = os.getcwd() os.chdir(path) try: yield finally: os.chdir(old_path)
[docs] def parse_test_status(line): status, test = line.split()[0:2] return test, status
[docs] def make_fake_teststatus(path, testname, status, phase): expect(phase in test_status.CORE_PHASES, "Bad phase '%s'" % phase) with test_status.TestStatus(test_dir=path, test_name=testname) as ts: for core_phase in test_status.CORE_PHASES: if core_phase == phase: ts.set_status( core_phase, status, comments=("time=42" if phase == test_status.RUN_PHASE else ""), ) break else: ts.set_status( core_phase, test_status.TEST_PASS_STATUS, comments=("time=42" if phase == test_status.RUN_PHASE else ""), )
[docs] class MockMachines(object): """A mock version of the Machines object to simplify testing.""" def __init__(self, name, os_): """Store the name.""" self.name = name self.os = os_
[docs] def get_machine_name(self): """Return the name we were given.""" return self.name
[docs] def get_value(self, var_name): """Allow the operating system to be queried.""" assert var_name == "OS", ( "Build asked for a value not " "implemented in the testing infrastructure." ) return self.os
[docs] def is_valid_compiler(self, _): # pylint:disable=no-self-use """Assume all compilers are valid.""" return True
[docs] def is_valid_MPIlib(self, _): """Assume all MPILIB settings are valid.""" return True
# pragma pylint: disable=unused-argument
[docs] def get_default_MPIlib(self, attributes=None): return "mpich2"
[docs] def get_default_compiler(self): return "intel"
[docs] class MakefileTester(object): """Helper class for checking Makefile output. Public methods: __init__ query_var assert_variable_equals assert_variable_matches """ # Note that the following is a Makefile and the echo line must begin with a tab _makefile_template = """ include Macros query: \techo '$({})' > query.out """ def __init__(self, parent, make_string): """Constructor for Makefile test helper class. Arguments: parent - The TestCase object that is using this item. make_string - Makefile contents to test. """ self.parent = parent self.make_string = make_string
[docs] def query_var(self, var_name, env, var): """Request the value of a variable in the Makefile, as a string. Arguments: var_name - Name of the variable to query. env - A dict containing extra environment variables to set when calling make. var - A dict containing extra make variables to set when calling make. (The distinction between env and var actually matters only for CMake, though.) """ if env is None: env = dict() if var is None: var = dict() # Write the Makefile strings to temporary files. temp_dir = tempfile.mkdtemp() macros_file_name = os.path.join(temp_dir, "Macros") makefile_name = os.path.join(temp_dir, "Makefile") output_name = os.path.join(temp_dir, "query.out") with open(macros_file_name, "w") as macros_file: macros_file.write(self.make_string) with open(makefile_name, "w") as makefile: makefile.write(self._makefile_template.format(var_name)) # environment = os.environ.copy() environment = dict(PATH=os.environ["PATH"]) environment.update(env) environment.update(var) for x in MACRO_PRESERVE_ENV: if x in os.environ: environment[x] = os.environ[x] gmake_exe = self.parent.MACHINE.get_value("GMAKE") if gmake_exe is None: gmake_exe = "gmake" self.parent.run_cmd_assert_result( "%s query --directory=%s 2>&1" % (gmake_exe, temp_dir), env=environment ) with open(output_name, "r") as output: query_result = output.read().strip() # Clean up the Makefiles. shutil.rmtree(temp_dir) return query_result
[docs] def assert_variable_equals(self, var_name, value, env=None, var=None): """Assert that a variable in the Makefile has a given value. Arguments: var_name - Name of variable to check. value - The string that the variable value should be equal to. env - Optional. Dict of environment variables to set when calling make. var - Optional. Dict of make variables to set when calling make. """ self.parent.assertEqual(self.query_var(var_name, env, var), value)
[docs] def assert_variable_matches(self, var_name, regex, env=None, var=None): """Assert that a variable in the Makefile matches a regex. Arguments: var_name - Name of variable to check. regex - The regex to match. env - Optional. Dict of environment variables to set when calling make. var - Optional. Dict of make variables to set when calling make. """ self.parent.assertRegexpMatches(self.query_var(var_name, env, var), regex)
[docs] class CMakeTester(object): """Helper class for checking CMake output. Public methods: __init__ query_var assert_variable_equals assert_variable_matches """ _cmakelists_template = """ include(./Macros.cmake) file(WRITE query.out "${{{}}}") """ def __init__(self, parent, cmake_string): """Constructor for CMake test helper class. Arguments: parent - The TestCase object that is using this item. cmake_string - CMake contents to test. """ self.parent = parent self.cmake_string = cmake_string
[docs] def query_var(self, var_name, env, var): """Request the value of a variable in Macros.cmake, as a string. Arguments: var_name - Name of the variable to query. env - A dict containing extra environment variables to set when calling cmake. var - A dict containing extra CMake variables to set when calling cmake. """ if env is None: env = dict() if var is None: var = dict() # Write the CMake strings to temporary files. temp_dir = tempfile.mkdtemp() macros_file_name = os.path.join(temp_dir, "Macros.cmake") cmakelists_name = os.path.join(temp_dir, "CMakeLists.txt") output_name = os.path.join(temp_dir, "query.out") with open(macros_file_name, "w") as macros_file: for key in var: macros_file.write("set({} {})\n".format(key, var[key])) macros_file.write(self.cmake_string) with open(cmakelists_name, "w") as cmakelists: cmakelists.write(self._cmakelists_template.format(var_name)) # environment = os.environ.copy() environment = dict(PATH=os.environ["PATH"]) environment.update(env) for x in MACRO_PRESERVE_ENV: if x in os.environ: environment[x] = os.environ[x] os_ = self.parent.MACHINE.get_value("OS") # cmake will not work on cray systems without this flag if os_ == "CNL": cmake_args = "-DCMAKE_SYSTEM_NAME=Catamount" else: cmake_args = "" self.parent.run_cmd_assert_result( "cmake %s . 2>&1" % cmake_args, from_dir=temp_dir, env=environment ) with open(output_name, "r") as output: query_result = output.read().strip() # Clean up the CMake files. shutil.rmtree(temp_dir) return query_result
[docs] def assert_variable_equals(self, var_name, value, env=None, var=None): """Assert that a variable in the CMakeLists has a given value. Arguments: var_name - Name of variable to check. value - The string that the variable value should be equal to. env - Optional. Dict of environment variables to set when calling cmake. var - Optional. Dict of CMake variables to set when calling cmake. """ self.parent.assertEqual(self.query_var(var_name, env, var), value)
[docs] def assert_variable_matches(self, var_name, regex, env=None, var=None): """Assert that a variable in the CMkeLists matches a regex. Arguments: var_name - Name of variable to check. regex - The regex to match. env - Optional. Dict of environment variables to set when calling cmake. var - Optional. Dict of CMake variables to set when calling cmake. """ self.parent.assertRegexpMatches(self.query_var(var_name, env, var), regex)
# TODO after dropping python 2.7 replace with tempfile.TemporaryDirectory
[docs] class TemporaryDirectory(object): def __init__(self): self._tempdir = None def __enter__(self): self._tempdir = tempfile.mkdtemp() return self._tempdir def __exit__(self, *args, **kwargs): if os.path.exists(self._tempdir): shutil.rmtree(self._tempdir)
# TODO replace with actual mock once 2.7 is dropped
[docs] class Mocker: def __init__(self, ret=None, cmd=None, return_value=None, side_effect=None): self._orig = [] self._ret = ret or return_value self._cmd = cmd self._calls = [] if isinstance(side_effect, (list, tuple)): self._side_effect = iter(side_effect) else: self._side_effect = side_effect self._method_calls = {} @property def calls(self): return self._calls @property def method_calls(self): return dict((x, y.calls) for x, y in self._method_calls.items()) @property def ret(self): return self._ret @ret.setter def ret(self, value): self._ret = value
[docs] def assert_called(self): assert len(self.calls) > 0
[docs] def assert_called_with(self, i=None, args=None, kwargs=None): if i is None: i = 0 call = self.calls[i] if args is not None: _call_args = set(call["args"]) _exp_args = set(args) assert _exp_args <= _call_args, "Got {} missing {}".format( _call_args, _exp_args - _call_args ) if kwargs is not None: call_kwargs = call["kwargs"] for x, y in kwargs.items(): assert call_kwargs[x] == y, "Missing {}".format(x)
def __getattr__(self, name): if name in self._method_calls: new_method = self._method_calls[name] else: new_method = Mocker(self, cmd=name) self._method_calls[name] = new_method return new_method def __call__(self, *args, **kwargs): self._calls.append({"args": args, "kwargs": kwargs}) if self._side_effect is not None and isinstance(self._side_effect, Iterable): rv = next(self._side_effect) else: rv = self._ret return rv def __del__(self): self.revert_mocks() def __enter__(self): return self def __exit__(self, *args, **kwargs): self.revert_mocks()
[docs] def revert_mocks(self): for m, module, method in self._orig: if isinstance(module, str): setattr(sys.modules[module], method, m) else: setattr(module, method, m)
[docs] def patch( self, module, method=None, ret=None, is_property=False, update_value_only=False ): rv = None if isinstance(module, str): x = module.split(".") main = ".".join(x[:-1]) if not update_value_only: self._orig.append((getattr(sys.modules[main], x[-1]), main, x[-1])) if is_property: setattr(sys.modules[main], x[-1], ret) else: rv = Mocker(ret, cmd=x[-1]) setattr(sys.modules[main], x[-1], rv) elif method != None: if not update_value_only: self._orig.append((getattr(module, method), module, method)) rv = Mocker(ret) setattr(module, method, rv) else: raise Exception("Could not patch") return rv