Source code for CIME.jenkins_generic_job

import CIME.wait_for_tests
from CIME.utils import expect, run_cmd_no_fail
from CIME.case import Case

import os, shutil, glob, signal, logging, threading, sys, re, tarfile, time

##############################################################################
[docs] def cleanup_queue(test_root, test_id): ############################################################################### """ Delete all jobs left in the queue """ for teststatus_file in glob.iglob("{}/*{}*/TestStatus".format(test_root, test_id)): case_dir = os.path.dirname(teststatus_file) with Case(case_dir, read_only=True) as case: jobmap = case.get_job_info() jobkills = [] for jobname, jobid in jobmap.items(): logging.warning( "Found leftover batch job {} ({}) that need to be deleted".format( jobid, jobname ) ) jobkills.append(jobid) case.cancel_batch_jobs(jobkills)
###############################################################################
[docs] def delete_old_test_data( mach_comp, test_id_root, scratch_root, test_root, run_area, build_area, archive_area, avoid_test_id, ): ############################################################################### # Remove old dirs for clutter_area in [scratch_root, test_root, run_area, build_area, archive_area]: for old_file in glob.glob( "{}/*{}*{}*".format(clutter_area, mach_comp, test_id_root) ): if avoid_test_id not in old_file: logging.info("TEST ARCHIVER: removing {}".format(old_file)) if os.path.isdir(old_file): shutil.rmtree(old_file) else: os.remove(old_file) else: logging.info( "TEST ARCHIVER: leaving case {} due to avoiding test id {}".format( old_file, avoid_test_id ) )
###############################################################################
[docs] def scan_for_test_ids(old_test_archive, mach_comp, test_id_root): ############################################################################### results = set([]) test_id_re = re.compile(".+[.]([^.]+)") for item in glob.glob( "{}/{}/*{}*{}*".format(old_test_archive, "old_cases", mach_comp, test_id_root) ): filename = os.path.basename(item) the_match = test_id_re.match(filename) if the_match: test_id = the_match.groups()[0] results.add(test_id) return list(results)
###############################################################################
[docs] def archive_old_test_data( machine, mach_comp, test_id_root, test_root, old_test_archive, avoid_test_id, ): ############################################################################### gb_allowed = machine.get_value("MAX_GB_OLD_TEST_DATA") gb_allowed = 500 if gb_allowed is None else gb_allowed bytes_allowed = gb_allowed * 1000000000 expect( bytes_allowed > 0, "Machine {} does not support test archiving".format(machine.get_machine_name()), ) # Remove old cs.status, cs.submit. I don't think there's any value to leaving these around # or archiving them for old_cs_file in glob.glob("{}/cs.*.{}[0-9]*".format(test_root, test_id_root)): if avoid_test_id not in old_cs_file: logging.info("TEST ARCHIVER: Removing {}".format(old_cs_file)) os.remove(old_cs_file) # Remove the old CTest XML, same reason as above if os.path.isdir("Testing"): logging.info( "TEST ARCHIVER: Removing {}".format(os.path.join(os.getcwd(), "Testing")) ) shutil.rmtree("Testing") if not os.path.exists(old_test_archive): os.mkdir(old_test_archive) # Archive old data by looking at old test cases for old_case in glob.glob( "{}/*{}*{}[0-9]*".format(test_root, mach_comp, test_id_root) ): if avoid_test_id not in old_case: logging.info("TEST ARCHIVER: archiving case {}".format(old_case)) exeroot, rundir, archdir = run_cmd_no_fail( "./xmlquery EXEROOT RUNDIR DOUT_S_ROOT --value", from_dir=old_case ).split(",") for the_dir, target_area in [ (exeroot, "old_builds"), (rundir, "old_runs"), (archdir, "old_archives"), (old_case, "old_cases"), ]: if os.path.exists(the_dir): start_time = time.time() logging.info( "TEST ARCHIVER: archiving {} to {}".format( the_dir, os.path.join(old_test_archive, target_area) ) ) if not os.path.exists(os.path.join(old_test_archive, target_area)): os.mkdir(os.path.join(old_test_archive, target_area)) old_case_name = os.path.basename(old_case) with tarfile.open( os.path.join( old_test_archive, target_area, "{}.tar.gz".format(old_case_name), ), "w:gz", ) as tfd: tfd.add(the_dir, arcname=old_case_name) shutil.rmtree(the_dir) # Remove parent dir if it's empty parent_dir = os.path.dirname(the_dir) if not os.listdir(parent_dir) or os.listdir(parent_dir) == [ "case2_output_root" ]: shutil.rmtree(parent_dir) end_time = time.time() logging.info( "TEST ARCHIVER: archiving {} took {} seconds".format( the_dir, int(end_time - start_time) ) ) else: logging.info( "TEST ARCHIVER: leaving case {} due to avoiding test id {}".format( old_case, avoid_test_id ) ) # Check size of archive bytes_of_old_test_data = int( run_cmd_no_fail("du -sb {}".format(old_test_archive)).split()[0] ) if bytes_of_old_test_data > bytes_allowed: logging.info( "TEST ARCHIVER: Too much test data, {}GB (actual) > {}GB (limit)".format( bytes_of_old_test_data / 1000000000, bytes_allowed / 1000000000 ) ) old_test_ids = scan_for_test_ids(old_test_archive, mach_comp, test_id_root) for old_test_id in sorted(old_test_ids): logging.info( "TEST ARCHIVER: Removing old data for test {}".format(old_test_id) ) for item in ["old_cases", "old_builds", "old_runs", "old_archives"]: for dir_to_rm in glob.glob( "{}/{}/*{}*{}*".format( old_test_archive, item, mach_comp, old_test_id ) ): logging.info("TEST ARCHIVER: Removing {}".format(dir_to_rm)) if os.path.isdir(dir_to_rm): shutil.rmtree(dir_to_rm) else: os.remove(dir_to_rm) bytes_of_old_test_data = int( run_cmd_no_fail("du -sb {}".format(old_test_archive)).split()[0] ) if bytes_of_old_test_data < bytes_allowed: break else: logging.info( "TEST ARCHIVER: Test data is within accepted bounds, {}GB (actual) < {}GB (limit)".format( bytes_of_old_test_data / 1000000000, bytes_allowed / 1000000000 ) )
###############################################################################
[docs] def handle_old_test_data( machine, compiler, test_id_root, scratch_root, test_root, avoid_test_id ): ############################################################################### run_area = os.path.dirname( os.path.dirname(machine.get_value("RUNDIR")) ) # Assumes XXX/$CASE/run build_area = os.path.dirname( os.path.dirname(machine.get_value("EXEROOT")) ) # Assumes XXX/$CASE/build archive_area = os.path.dirname( machine.get_value("DOUT_S_ROOT") ) # Assumes XXX/archive/$CASE old_test_archive = os.path.join(scratch_root, "old_test_archive") mach_comp = "{}_{}".format(machine.get_machine_name(), compiler) try: archive_old_test_data( machine, mach_comp, test_id_root, test_root, old_test_archive, avoid_test_id, ) except Exception: logging.warning( "TEST ARCHIVER: Archiving of old test data FAILED: {}\nDeleting data instead".format( sys.exc_info()[1] ) ) delete_old_test_data( mach_comp, test_id_root, scratch_root, test_root, run_area, build_area, archive_area, avoid_test_id, )
###############################################################################
[docs] def jenkins_generic_job( generate_baselines, submit_to_cdash, no_batch, baseline_name, arg_cdash_build_name, cdash_project, arg_test_suite, cdash_build_group, baseline_compare, scratch_root, parallel_jobs, walltime, machine, compiler, real_baseline_name, baseline_root, update_success, check_throughput, check_memory, ignore_memleak, ignore_namelists, ignore_diffs, save_timing, pes_file, jenkins_id, queue, ): ############################################################################### """ Return True if all tests passed """ use_batch = machine.has_batch_system() and not no_batch test_suite = machine.get_value("TESTS") proxy = machine.get_value("PROXY") test_suite = test_suite if arg_test_suite is None else arg_test_suite test_root = os.path.join(scratch_root, "J") if use_batch: batch_system = machine.get_value("BATCH_SYSTEM") expect( batch_system is not None, "Bad XML. Batch machine has no batch_system configuration.", ) # # Env changes # if submit_to_cdash and proxy is not None: os.environ["http_proxy"] = proxy if not os.path.isdir(scratch_root): os.makedirs(scratch_root) # Important, need to set up signal handlers before we officially # kick off tests. We don't want this process getting killed outright # since it's critical that the cleanup in the finally block gets run CIME.wait_for_tests.set_up_signal_handlers() # # Clean up leftovers from previous run of jenkins_generic_job. This will # break the previous run of jenkins_generic_job if it's still running. Set up # the Jenkins jobs with timeouts to avoid this. # if jenkins_id is not None: test_id_root = jenkins_id test_id = "%s%s" % (test_id_root, CIME.utils.get_timestamp("%y%m%d_%H%M%S")) else: test_id_root = "J{}{}".format( baseline_name.capitalize(), test_suite.replace("e3sm_", "").capitalize() ) test_id = "%s%s" % (test_id_root, CIME.utils.get_timestamp()) archiver_thread = threading.Thread( target=handle_old_test_data, args=(machine, compiler, test_id_root, scratch_root, test_root, test_id), ) archiver_thread.start() # # Set up create_test command and run it # create_test_args = [ test_suite, "--test-root %s" % test_root, "-t %s" % test_id, "--machine %s" % machine.get_machine_name(), "--compiler %s" % compiler, ] if generate_baselines: create_test_args.append("-g -b " + real_baseline_name) elif baseline_compare: create_test_args.append("-c -b " + real_baseline_name) if scratch_root != machine.get_value("CIME_OUTPUT_ROOT"): create_test_args.append("--output-root=" + scratch_root) if no_batch: create_test_args.append("--no-batch") if parallel_jobs is not None: create_test_args.append("-j {:d}".format(parallel_jobs)) if walltime is not None: create_test_args.append("--walltime " + walltime) if baseline_root is not None: create_test_args.append("--baseline-root " + baseline_root) if pes_file is not None: create_test_args.append("--pesfile " + pes_file) if queue is not None: create_test_args.append("--queue " + queue) if save_timing: create_test_args.append("--save-timing") create_test_cmd = "./create_test " + " ".join(create_test_args) if not CIME.wait_for_tests.SIGNAL_RECEIVED: create_test_stat = CIME.utils.run_cmd( create_test_cmd, from_dir=CIME.utils.get_scripts_root(), verbose=True, arg_stdout=None, arg_stderr=None, )[0] # Create_test should have either passed, detected failing tests, or timed out expect( create_test_stat in [0, CIME.utils.TESTS_FAILED_ERR_CODE, -signal.SIGTERM], "Create_test script FAILED with error code '{:d}'!".format( create_test_stat ), ) # # Wait for tests # if submit_to_cdash: cdash_build_name = ( "_".join([test_suite, baseline_name, compiler]) if arg_cdash_build_name is None else arg_cdash_build_name ) else: cdash_build_name = None os.environ["CIME_MACHINE"] = machine.get_machine_name() if submit_to_cdash: logging.info( "To resubmit to dashboard: wait_for_tests {}/*{}/TestStatus --no-wait -b {}".format( test_root, test_id, cdash_build_name ) ) tests_passed = CIME.wait_for_tests.wait_for_tests( glob.glob("{}/*{}/TestStatus".format(test_root, test_id)), no_wait=not use_batch, # wait if using queue check_throughput=check_throughput, check_memory=check_memory, ignore_namelists=ignore_namelists, ignore_diffs=ignore_diffs, ignore_memleak=ignore_memleak, cdash_build_name=cdash_build_name, cdash_project=cdash_project, cdash_build_group=cdash_build_group, update_success=update_success, ) logging.info("TEST ARCHIVER: Waiting for archiver thread") archiver_thread.join() logging.info("TEST ARCHIVER: Waiting for archiver finished") if use_batch and CIME.wait_for_tests.SIGNAL_RECEIVED: # Cleanup cleanup_queue(test_root, test_id) return tests_passed