Source code for CIME.tests.test_unit_utils

#!/usr/bin/env python3

import os
import stat
import shutil
import sys
import tempfile

import unittest
from unittest import mock
from CIME.status import run_and_log_case_status
from CIME.utils import (
    indent_string,
    import_from_file,
    _line_defines_python_function,
    file_contains_python_function,
    copy_globs,
    import_and_run_sub_or_cmd,
)


[docs] class TestIndentStr(unittest.TestCase): """Test the indent_string function."""
[docs] def test_indent_string_singleline(self): """Test the indent_string function with a single-line string""" mystr = "foo" result = indent_string(mystr, 4) expected = " foo" self.assertEqual(expected, result)
[docs] def test_indent_string_multiline(self): """Test the indent_string function with a multi-line string""" mystr = """hello hi goodbye """ result = indent_string(mystr, 2) expected = """ hello hi goodbye """ self.assertEqual(expected, result)
[docs] class TestLineDefinesPythonFunction(unittest.TestCase): """Tests of _line_defines_python_function""" # ------------------------------------------------------------------------ # Tests of _line_defines_python_function that should return True # ------------------------------------------------------------------------
[docs] def test_def_foo(self): """Test of a def of the function of interest""" line = "def foo():" self.assertTrue(_line_defines_python_function(line, "foo"))
[docs] def test_def_foo_space(self): """Test of a def of the function of interest, with an extra space before the parentheses""" line = "def foo ():" self.assertTrue(_line_defines_python_function(line, "foo"))
[docs] def test_import_foo(self): """Test of an import of the function of interest""" line = "from bar.baz import foo" self.assertTrue(_line_defines_python_function(line, "foo"))
[docs] def test_import_foo_space(self): """Test of an import of the function of interest, with trailing spaces""" line = "from bar.baz import foo " self.assertTrue(_line_defines_python_function(line, "foo"))
[docs] def test_import_foo_then_others(self): """Test of an import of the function of interest, along with others""" line = "from bar.baz import foo, bar" self.assertTrue(_line_defines_python_function(line, "foo"))
[docs] def test_import_others_then_foo(self): """Test of an import of the function of interest, after others""" line = "from bar.baz import bar, foo" self.assertTrue(_line_defines_python_function(line, "foo"))
# ------------------------------------------------------------------------ # Tests of _line_defines_python_function that should return False # ------------------------------------------------------------------------
[docs] def test_def_barfoo(self): """Test of a def of a different function""" line = "def barfoo():" self.assertFalse(_line_defines_python_function(line, "foo"))
[docs] def test_def_foobar(self): """Test of a def of a different function""" line = "def foobar():" self.assertFalse(_line_defines_python_function(line, "foo"))
[docs] def test_def_foo_indented(self): """Test of a def of the function of interest, but indented""" line = " def foo():" self.assertFalse(_line_defines_python_function(line, "foo"))
[docs] def test_def_foo_no_parens(self): """Test of a def of the function of interest, but without parentheses""" line = "def foo:" self.assertFalse(_line_defines_python_function(line, "foo"))
[docs] def test_import_foo_indented(self): """Test of an import of the function of interest, but indented""" line = " from bar.baz import foo" self.assertFalse(_line_defines_python_function(line, "foo"))
[docs] def test_import_barfoo(self): """Test of an import of a different function""" line = "from bar.baz import barfoo" self.assertFalse(_line_defines_python_function(line, "foo"))
[docs] def test_import_foobar(self): """Test of an import of a different function""" line = "from bar.baz import foobar" self.assertFalse(_line_defines_python_function(line, "foo"))
[docs] class TestFileContainsPythonFunction(unittest.TestCase): """Tests of file_contains_python_function"""
[docs] def setUp(self): self._workdir = tempfile.mkdtemp()
[docs] def tearDown(self): shutil.rmtree(self._workdir, ignore_errors=True)
[docs] def create_test_file(self, contents): """Creates a test file with the given contents, and returns the path to that file""" filepath = os.path.join(self._workdir, "testfile") with open(filepath, "w") as fd: fd.write(contents) return filepath
[docs] def test_contains_correct_def_and_others(self): """Test file_contains_python_function with a correct def mixed with other defs""" contents = """ def bar(): def foo(): def baz(): """ filepath = self.create_test_file(contents) self.assertTrue(file_contains_python_function(filepath, "foo"))
[docs] def test_does_not_contain_correct_def(self): """Test file_contains_python_function without the correct def""" contents = """ def bar(): def notfoo(): def baz(): """ filepath = self.create_test_file(contents) self.assertFalse(file_contains_python_function(filepath, "foo"))
[docs] class MockTime(object): def __init__(self): self._old = None def __enter__(self): self._old = getattr(sys.modules["time"], "strftime") setattr(sys.modules["time"], "strftime", lambda *args: "00:00:00 ") def __exit__(self, *args, **kwargs): setattr(sys.modules["time"], "strftime", self._old)
[docs] def match_all_lines(data, lines): for line in data: for i, x in enumerate(lines): if x == line: lines.pop(i) continue if len(lines) == 0: return True, [] return False, lines
[docs] class TestUtils(unittest.TestCase):
[docs] def setUp(self): self.base_func = lambda *args: None # pylint: disable=unused-argument def _error_func(*args): raise Exception("Something went wrong") self.error_func = _error_func
[docs] def test_import_and_run_sub_or_cmd(self): with self.assertRaisesRegex( Exception, "ERROR: Could not find buildnml file for component test" ): import_and_run_sub_or_cmd( "/tmp/buildnml", "arg1 arg2 -vvv", "buildnml", (self, "arg1"), "/tmp", "test", )
[docs] @mock.patch("importlib.import_module") def test_import_and_run_sub_or_cmd_cime_py(self, importmodule): importmodule.side_effect = Exception("Module has a problem") with self.assertRaisesRegex(Exception, "Module has a problem") as e: import_and_run_sub_or_cmd( "/tmp/buildnml", "arg1, arg2 -vvv", "buildnml", (self, "arg1"), "/tmp", "test", ) # check that we avoid exception chaining self.assertTrue(e.exception.__context__ is None)
[docs] @mock.patch("importlib.import_module") def test_import_and_run_sub_or_cmd_import(self, importmodule): importmodule.side_effect = Exception("I am being imported") with self.assertRaisesRegex(Exception, "I am being imported") as e: import_and_run_sub_or_cmd( "/tmp/buildnml", "arg1 arg2 -vvv", "buildnml", (self, "arg1"), "/tmp", "test", ) # check that we avoid exception chaining self.assertTrue(e.exception.__context__ is None)
[docs] @mock.patch("os.path.isfile") @mock.patch("CIME.utils.run_sub_or_cmd") def test_import_and_run_sub_or_cmd_run(self, func, isfile): isfile.return_value = True func.side_effect = Exception( "ERROR: /tmp/buildnml arg1 arg2 -vvv FAILED, see above" ) with self.assertRaisesRegex( Exception, "ERROR: /tmp/buildnml arg1 arg2 -vvv FAILED, see above" ): import_and_run_sub_or_cmd( "/tmp/buildnml", "arg1 arg2 -vvv", "buildnml", (self, "arg1"), "/tmp", "test", )
[docs] @mock.patch("glob.glob") @mock.patch("CIME.utils.safe_copy") def test_copy_globs(self, safe_copy, glob): glob.side_effect = [ [], ["/src/run/test.sh", "/src/run/.hidden.sh"], [ "/src/bld/test.nc", ], ] copy_globs(["CaseDocs/*", "run/*.sh", "bld/*.nc"], "/storage/output", "uid") safe_copy.assert_any_call( "/src/run/test.sh", "/storage/output/test.sh.uid", preserve_meta=False ) safe_copy.assert_any_call( "/src/run/.hidden.sh", "/storage/output/hidden.sh.uid", preserve_meta=False ) safe_copy.assert_any_call( "/src/bld/test.nc", "/storage/output/test.nc.uid", preserve_meta=False )
[docs] def assertMatchAllLines(self, tempdir, test_lines): with open(os.path.join(tempdir, "CaseStatus")) as fd: data = fd.readlines() result, missing = match_all_lines(data, test_lines) error = [] if len(missing) != 0: error.extend(["Missing Lines", ""]) error.extend([x.rstrip("\n") for x in missing]) error.extend(["", "Tempfile contents", ""]) error.extend([x.rstrip("\n") for x in data]) self.assertTrue(result, msg="\n".join(error))
[docs] def test_import_from_file(self): with tempfile.NamedTemporaryFile() as fd: fd.writelines( [ b"def test():\n", b" return 'value'", ] ) fd.flush() module = import_from_file("test.py", fd.name) assert module.test() == "value"
[docs] def test_run_and_log_case_status(self): test_lines = [ "00:00:00 default starting \n", "00:00:00 default success \n", ] with tempfile.TemporaryDirectory() as tempdir, MockTime(): run_and_log_case_status(self.base_func, "default", caseroot=tempdir) self.assertMatchAllLines(tempdir, test_lines)
[docs] def test_run_and_log_case_status_case_submit_on_batch(self): test_lines = [ "00:00:00 case.submit starting \n", "00:00:00 case.submit success \n", ] with tempfile.TemporaryDirectory() as tempdir, MockTime(): run_and_log_case_status( self.base_func, "case.submit", caseroot=tempdir, is_batch=True ) self.assertMatchAllLines(tempdir, test_lines)
[docs] def test_run_and_log_case_status_case_submit_no_batch(self): test_lines = [ "00:00:00 case.submit starting \n", "00:00:00 case.submit success \n", ] with tempfile.TemporaryDirectory() as tempdir, MockTime(): run_and_log_case_status( self.base_func, "case.submit", caseroot=tempdir, is_batch=False ) self.assertMatchAllLines(tempdir, test_lines)
[docs] def test_run_and_log_case_status_case_submit_error_on_batch(self): test_lines = [ "00:00:00 case.submit starting \n", "00:00:00 case.submit error \n", "Something went wrong\n", ] with tempfile.TemporaryDirectory() as tempdir, MockTime(): with self.assertRaises(Exception): run_and_log_case_status( self.error_func, "case.submit", caseroot=tempdir, is_batch=True ) self.assertMatchAllLines(tempdir, test_lines)
[docs] def test_run_and_log_case_status_custom_msg(self): test_lines = [ "00:00:00 default starting starting extra\n", "00:00:00 default success success extra\n", ] starting_func = mock.MagicMock(return_value="starting extra") success_func = mock.MagicMock(return_value="success extra") def normal_func(): return "data" with tempfile.TemporaryDirectory() as tempdir, MockTime(): run_and_log_case_status( normal_func, "default", custom_starting_msg_functor=starting_func, custom_success_msg_functor=success_func, caseroot=tempdir, ) self.assertMatchAllLines(tempdir, test_lines) starting_func.assert_called_with() success_func.assert_called_with("data")
[docs] def test_run_and_log_case_status_custom_msg_error_on_batch(self): test_lines = [ "00:00:00 default starting starting extra\n", "00:00:00 default success success extra\n", ] starting_func = mock.MagicMock(return_value="starting extra") success_func = mock.MagicMock(return_value="success extra") def error_func(): raise Exception("Error") with tempfile.TemporaryDirectory() as tempdir, MockTime(), self.assertRaises( Exception ): run_and_log_case_status( error_func, "default", custom_starting_msg_functor=starting_func, custom_success_msg_functor=success_func, caseroot=tempdir, ) self.assertMatchAllLines(tempdir, test_lines) starting_func.assert_called_with() success_func.assert_not_called()
[docs] def test_run_and_log_case_status_error(self): test_lines = [ "00:00:00 default starting \n", "00:00:00 default error \n", "Something went wrong\n", ] with tempfile.TemporaryDirectory() as tempdir, MockTime(): with self.assertRaises(Exception): run_and_log_case_status(self.error_func, "default", caseroot=tempdir) self.assertMatchAllLines(tempdir, test_lines)
if __name__ == "__main__": unittest.main()