#!/usr/bin/env python
"""
Library for saving build/run provenance.
"""
from CIME.XML.standard_module_setup import *
from CIME.utils import touch, gzip_existing_file, SharedArea, convert_to_babylonian_time, get_current_commit, get_current_submodule_status, indent_string, run_cmd, run_cmd_no_fail, safe_copy
import tarfile, getpass, signal, glob, shutil, sys
logger = logging.getLogger(__name__)
def _get_batch_job_id_for_syslog(case):
"""
mach_syslog only works on certain machines
"""
mach = case.get_value("MACH")
try:
if mach in ['titan']:
return os.environ["PBS_JOBID"]
elif mach in ['anvil', 'compy', 'cori-haswell', 'cori-knl']:
return os.environ["SLURM_JOB_ID"]
elif mach in ['mira', 'theta']:
return os.environ["COBALT_JOBID"]
elif mach in ['summit']:
return os.environ["LSB_JOBID"]
except KeyError:
pass
return None
def _save_build_provenance_e3sm(case, lid):
srcroot = case.get_value("SRCROOT")
exeroot = case.get_value("EXEROOT")
caseroot = case.get_value("CASEROOT")
# Save git describe
describe_prov = os.path.join(exeroot, "GIT_DESCRIBE.{}".format(lid))
desc = get_current_commit(tag=True, repo=srcroot)
with open(describe_prov, "w") as fd:
fd.write(desc)
# Save HEAD
headfile = os.path.join(srcroot, ".git", "logs", "HEAD")
headfile_prov = os.path.join(exeroot, "GIT_LOGS_HEAD.{}".format(lid))
if os.path.exists(headfile_prov):
os.remove(headfile_prov)
if os.path.exists(headfile):
safe_copy(headfile, headfile_prov, preserve_meta=False)
# Save git submodule status
submodule_prov = os.path.join(exeroot, "GIT_SUBMODULE_STATUS.{}".format(lid))
subm_status = get_current_submodule_status(recursive=True, repo=srcroot)
with open(submodule_prov, "w") as fd:
fd.write(subm_status)
# Save SourceMods
sourcemods = os.path.join(caseroot, "SourceMods")
sourcemods_prov = os.path.join(exeroot, "SourceMods.{}.tar.gz".format(lid))
if os.path.exists(sourcemods_prov):
os.remove(sourcemods_prov)
if os.path.isdir(sourcemods):
with tarfile.open(sourcemods_prov, "w:gz") as tfd:
tfd.add(sourcemods, arcname="SourceMods")
# Save build env
env_prov = os.path.join(exeroot, "build_environment.{}.txt".format(lid))
if os.path.exists(env_prov):
os.remove(env_prov)
env_module = case.get_env("mach_specific")
env_module.save_all_env_info(env_prov)
# For all the just-created post-build provenance files, symlink a generic name
# to them to indicate that these are the most recent or active.
for item in ["GIT_DESCRIBE", "GIT_LOGS_HEAD", "GIT_SUBMODULE_STATUS", "SourceMods", "build_environment"]:
globstr = "{}/{}.{}*".format(exeroot, item, lid)
matches = glob.glob(globstr)
expect(len(matches) < 2, "Multiple matches for glob {} should not have happened".format(globstr))
if matches:
the_match = matches[0]
generic_name = the_match.replace(".{}".format(lid), "")
if os.path.exists(generic_name):
os.remove(generic_name)
os.symlink(the_match, generic_name)
def _save_build_provenance_cesm(case, lid): # pylint: disable=unused-argument
version = case.get_value("MODEL_VERSION")
# version has already been recorded
srcroot = case.get_value("SRCROOT")
manic = os.path.join("manage_externals","checkout_externals")
manic_full_path = os.path.join(srcroot, manic)
out = None
if os.path.exists(manic_full_path):
args = " --status --verbose --no-logging"
stat, out, err = run_cmd(manic_full_path + args, from_dir=srcroot)
errmsg = """Error gathering provenance information from manage_externals.
manage_externals error message:
{err}
manage_externals output:
{out}
To solve this, either:
(1) Find and fix the problem: From {srcroot}, try to get this command to work:
{manic}{args}
(2) If you don't need provenance information, rebuild with --skip-provenance-check
""".format(out=indent_string(out, 4), err=indent_string(err, 4),
srcroot=srcroot, manic=manic, args=args)
expect(stat==0,errmsg)
caseroot = case.get_value("CASEROOT")
with open(os.path.join(caseroot, "CaseStatus"), "a") as fd:
if version is not None and version != "unknown":
fd.write("CESM version is {}\n".format(version))
if out is not None:
fd.write("{}\n".format(out))
[docs]def save_build_provenance(case, lid=None):
with SharedArea():
model = case.get_value("MODEL")
lid = os.environ["LID"] if lid is None else lid
if model == "e3sm":
_save_build_provenance_e3sm(case, lid)
elif model == "cesm":
_save_build_provenance_cesm(case, lid)
def _save_prerun_timing_e3sm(case, lid):
project = case.get_value("PROJECT", subgroup=case.get_primary_job())
if not case.is_save_timing_dir_project(project):
return
timing_dir = case.get_value("SAVE_TIMING_DIR")
if timing_dir is None or not os.path.isdir(timing_dir):
logger.warning("SAVE_TIMING_DIR {} is not valid. E3SM requires a valid SAVE_TIMING_DIR to archive timing data.".format(timing_dir))
return
logger.info("Archiving timing data and associated provenance in {}.".format(timing_dir))
rundir = case.get_value("RUNDIR")
blddir = case.get_value("EXEROOT")
caseroot = case.get_value("CASEROOT")
srcroot = case.get_value("SRCROOT")
base_case = case.get_value("CASE")
full_timing_dir = os.path.join(timing_dir, "performance_archive", getpass.getuser(), base_case, lid)
if os.path.exists(full_timing_dir):
logger.warning("{} already exists. Skipping archive of timing data and associated provenance.".format(full_timing_dir))
return
try:
os.makedirs(full_timing_dir)
except OSError:
logger.warning("{} cannot be created. Skipping archive of timing data and associated provenance.".format(full_timing_dir))
return
mach = case.get_value("MACH")
compiler = case.get_value("COMPILER")
# For some batch machines save queue info
job_id = _get_batch_job_id_for_syslog(case)
if job_id is not None:
if mach == "mira":
for cmd, filename in [("qstat -f", "qstatf"), ("qstat -lf %s" % job_id, "qstatf_jobid")]:
filename = "%s.%s" % (filename, lid)
run_cmd_no_fail(cmd, arg_stdout=filename, from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename))
elif mach == "theta":
for cmd, filename in [("qstat -l --header JobID:JobName:User:Project:WallTime:QueuedTime:Score:RunTime:TimeRemaining:Nodes:State:Location:Mode:Command:Args:Procs:Queue:StartTime:attrs:Geometry", "qstatf"),
("qstat -lf %s" % job_id, "qstatf_jobid"),
("xtnodestat", "xtnodestat"),
("xtprocadmin", "xtprocadmin")]:
filename = "%s.%s" % (filename, lid)
run_cmd_no_fail(cmd, arg_stdout=filename, from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename))
elif mach in ["cori-haswell", "cori-knl"]:
for cmd, filename in [("sinfo -a -l", "sinfol"), ("scontrol show jobid %s" % job_id, "sqsf_jobid"),
# ("sqs -f", "sqsf"),
("squeue -o '%.10i %.15P %.20j %.10u %.7a %.2t %.6D %.8C %.10M %.10l %.20S %.20V'", "squeuef"),
("squeue -t R -o '%.10i %R'", "squeues")]:
filename = "%s.%s" % (filename, lid)
run_cmd_no_fail(cmd, arg_stdout=filename, from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename))
elif mach == "titan":
for cmd, filename in [("qstat -f %s >" % job_id, "qstatf_jobid"),
("xtnodestat >", "xtnodestat"),
# ("qstat -f >", "qstatf"),
# ("xtdb2proc -f", "xtdb2proc"),
("showq >", "showq")]:
full_cmd = cmd + " " + filename
run_cmd_no_fail(full_cmd + "." + lid, from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename + "." + lid))
elif mach in ["anvil", "compy"]:
for cmd, filename in [("sinfo -l", "sinfol"),
("squeue -o '%all' --job {}".format(job_id), "squeueall_jobid"),
("squeue -o '%.10i %.10P %.15u %.20a %.2t %.6D %.8C %.12M %.12l %.20S %.20V %j'", "squeuef"),
("squeue -t R -o '%.10i %R'", "squeues")]:
filename = "%s.%s" % (filename, lid)
run_cmd_no_fail(cmd, arg_stdout=filename, from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename))
elif mach == "summit":
for cmd, filename in [("bjobs -u all >", "bjobsu_all"),
("bjobs -r -u all -o 'jobid slots exec_host' >", "bjobsru_allo"),
("bjobs -l -UF %s >" % job_id, "bjobslUF_jobid")]:
full_cmd = cmd + " " + filename
run_cmd_no_fail(full_cmd + "." + lid, from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename + "." + lid))
# copy/tar SourceModes
source_mods_dir = os.path.join(caseroot, "SourceMods")
if os.path.isdir(source_mods_dir):
with tarfile.open(os.path.join(full_timing_dir, "SourceMods.{}.tar.gz".format(lid)), "w:gz") as tfd:
tfd.add(source_mods_dir, arcname="SourceMods")
# Save various case configuration items
case_docs = os.path.join(full_timing_dir, "CaseDocs.{}".format(lid))
os.mkdir(case_docs)
globs_to_copy = [
"CaseDocs/*",
"*.run",
".*.run",
"*.xml",
"user_nl_*",
"*env_mach_specific*",
"Macros*",
"README.case",
"Depends.{}".format(mach),
"Depends.{}".format(compiler),
"Depends.{}.{}".format(mach, compiler),
"software_environment.txt"
]
for glob_to_copy in globs_to_copy:
for item in glob.glob(os.path.join(caseroot, glob_to_copy)):
safe_copy(item, os.path.join(case_docs, "{}.{}".format(os.path.basename(item).lstrip("."), lid)), preserve_meta=False)
# Copy some items from build provenance
blddir_globs_to_copy = [
"GIT_LOGS_HEAD",
"build_environment.txt"
]
for blddir_glob_to_copy in blddir_globs_to_copy:
for item in glob.glob(os.path.join(blddir, blddir_glob_to_copy)):
safe_copy(item, os.path.join(full_timing_dir, os.path.basename(item) + "." + lid), preserve_meta=False)
# Save state of repo
from_repo = srcroot if os.path.exists(os.path.join(srcroot, ".git")) else os.path.dirname(srcroot)
desc = get_current_commit(tag=True, repo=from_repo)
with open(os.path.join(full_timing_dir, "GIT_DESCRIBE.{}".format(lid)), "w") as fd:
fd.write(desc)
# What this block does is mysterious to me (JGF)
if job_id is not None:
# Kill mach_syslog from previous run if one exists
syslog_jobid_path = os.path.join(rundir, "syslog_jobid.{}".format(job_id))
if os.path.exists(syslog_jobid_path):
try:
with open(syslog_jobid_path, "r") as fd:
syslog_jobid = int(fd.read().strip())
os.kill(syslog_jobid, signal.SIGTERM)
except (ValueError, OSError) as e:
logger.warning("Failed to kill syslog: {}".format(e))
finally:
os.remove(syslog_jobid_path)
# If requested, spawn a mach_syslog process to monitor job progress
sample_interval = case.get_value("SYSLOG_N")
if sample_interval > 0:
archive_checkpoints = os.path.join(full_timing_dir, "checkpoints.{}".format(lid))
os.mkdir(archive_checkpoints)
touch("{}/e3sm.log.{}".format(rundir, lid))
syslog_jobid = run_cmd_no_fail("./mach_syslog {si} {jobid} {lid} {rundir} {rundir}/timing/checkpoints {ac} >& /dev/null & echo $!".format(si=sample_interval, jobid=job_id, lid=lid, rundir=rundir, ac=archive_checkpoints),
from_dir=os.path.join(caseroot, "Tools"))
with open(os.path.join(rundir, "syslog_jobid.{}".format(job_id)), "w") as fd:
fd.write("{}\n".format(syslog_jobid))
def _save_prerun_provenance_e3sm(case, lid):
if case.get_value("SAVE_TIMING"):
_save_prerun_timing_e3sm(case, lid)
def _save_prerun_provenance_cesm(case, lid): # pylint: disable=unused-argument
pass
[docs]def save_prerun_provenance(case, lid=None):
with SharedArea():
# Always save env
lid = os.environ["LID"] if lid is None else lid
env_module = case.get_env("mach_specific")
logdir = os.path.join(case.get_value("CASEROOT"), "logs")
if not os.path.isdir(logdir):
os.makedirs(logdir)
env_module.save_all_env_info(os.path.join(logdir, "run_environment.txt.{}".format(lid)))
model = case.get_value("MODEL")
if model == "e3sm":
_save_prerun_provenance_e3sm(case, lid)
elif model == "cesm":
_save_prerun_provenance_cesm(case, lid)
def _save_postrun_provenance_cesm(case, lid):
save_timing = case.get_value("SAVE_TIMING")
if save_timing:
rundir = case.get_value("RUNDIR")
timing_dir = os.path.join("timing", case.get_value("CASE"))
shutil.move(os.path.join(rundir,"timing"),
os.path.join(timing_dir,"timing."+lid))
def _save_postrun_timing_e3sm(case, lid):
caseroot = case.get_value("CASEROOT")
rundir = case.get_value("RUNDIR")
# tar timings
rundir_timing_dir = os.path.join(rundir, "timing." + lid)
shutil.move(os.path.join(rundir, "timing"), rundir_timing_dir)
with tarfile.open("%s.tar.gz" % rundir_timing_dir, "w:gz") as tfd:
tfd.add(rundir_timing_dir, arcname=os.path.basename(rundir_timing_dir))
shutil.rmtree(rundir_timing_dir)
atm_chunk_costs_src_path = os.path.join(rundir, "atm_chunk_costs.txt")
if os.path.exists(atm_chunk_costs_src_path):
atm_chunk_costs_dst_path = os.path.join(rundir, "atm_chunk_costs.{}".format(lid))
shutil.move(atm_chunk_costs_src_path, atm_chunk_costs_dst_path)
gzip_existing_file(atm_chunk_costs_dst_path)
gzip_existing_file(os.path.join(caseroot, "timing", "e3sm_timing_stats.%s" % lid))
# JGF: not sure why we do this
timing_saved_file = "timing.%s.saved" % lid
touch(os.path.join(caseroot, "timing", timing_saved_file))
project = case.get_value("PROJECT", subgroup=case.get_primary_job())
if not case.is_save_timing_dir_project(project):
return
timing_dir = case.get_value("SAVE_TIMING_DIR")
if timing_dir is None or not os.path.isdir(timing_dir):
return
mach = case.get_value("MACH")
base_case = case.get_value("CASE")
full_timing_dir = os.path.join(timing_dir, "performance_archive", getpass.getuser(), base_case, lid)
if not os.path.isdir(full_timing_dir):
return
# Kill mach_syslog
job_id = _get_batch_job_id_for_syslog(case)
if job_id is not None:
syslog_jobid_path = os.path.join(rundir, "syslog_jobid.{}".format(job_id))
if os.path.exists(syslog_jobid_path):
try:
with open(syslog_jobid_path, "r") as fd:
syslog_jobid = int(fd.read().strip())
os.kill(syslog_jobid, signal.SIGTERM)
except (ValueError, OSError) as e:
logger.warning("Failed to kill syslog: {}".format(e))
finally:
os.remove(syslog_jobid_path)
# copy timings
safe_copy("%s.tar.gz" % rundir_timing_dir, full_timing_dir, preserve_meta=False)
#
# save output files and logs
#
globs_to_copy = []
if job_id is not None:
if mach == "titan":
globs_to_copy.append("%s*OU" % job_id)
elif mach == "anvil":
globs_to_copy.append("%s*run*%s" % (case.get_value("CASE"), job_id))
elif mach == "compy":
globs_to_copy.append("slurm.err")
globs_to_copy.append("slurm.out")
elif mach in ["mira", "theta"]:
globs_to_copy.append("%s*error" % job_id)
globs_to_copy.append("%s*output" % job_id)
globs_to_copy.append("%s*cobaltlog" % job_id)
elif mach in ["cori-haswell", "cori-knl"]:
globs_to_copy.append("%s*run*%s" % (case.get_value("CASE"), job_id))
elif mach == "summit":
globs_to_copy.append("e3sm.stderr.%s" % job_id)
globs_to_copy.append("e3sm.stdout.%s" % job_id)
globs_to_copy.append("logs/run_environment.txt.{}".format(lid))
globs_to_copy.append(os.path.join(rundir, "e3sm.log.{}.gz".format(lid)))
globs_to_copy.append(os.path.join(rundir, "cpl.log.{}.gz".format(lid)))
globs_to_copy.append(os.path.join(rundir, "atm_chunk_costs.{}.gz".format(lid)))
globs_to_copy.append("timing/*.{}*".format(lid))
globs_to_copy.append("CaseStatus")
for glob_to_copy in globs_to_copy:
for item in glob.glob(os.path.join(caseroot, glob_to_copy)):
basename = os.path.basename(item)
if basename != timing_saved_file:
if lid not in basename and not basename.endswith(".gz"):
safe_copy(item, os.path.join(full_timing_dir, "{}.{}".format(basename, lid)), preserve_meta=False)
else:
safe_copy(item, full_timing_dir, preserve_meta=False)
# zip everything
for root, _, files in os.walk(full_timing_dir):
for filename in files:
if not filename.endswith(".gz"):
gzip_existing_file(os.path.join(root, filename))
def _save_postrun_provenance_e3sm(case, lid):
if case.get_value("SAVE_TIMING"):
_save_postrun_timing_e3sm(case, lid)
[docs]def save_postrun_provenance(case, lid=None):
with SharedArea():
model = case.get_value("MODEL")
lid = os.environ["LID"] if lid is None else lid
if model == "e3sm":
_save_postrun_provenance_e3sm(case, lid)
elif model == "cesm":
_save_postrun_provenance_cesm(case, lid)
_WALLTIME_BASELINE_NAME = "walltimes"
_WALLTIME_FILE_NAME = "walltimes"
_GLOBAL_MINUMUM_TIME = 900
_GLOBAL_WIGGLE = 1000
_WALLTIME_TOLERANCE = ( (600, 2.0), (1800, 1.5), (9999999999, 1.25) )
[docs]def get_recommended_test_time_based_on_past(baseline_root, test, raw=False):
if baseline_root is not None:
try:
the_path = os.path.join(baseline_root, _WALLTIME_BASELINE_NAME, test, _WALLTIME_FILE_NAME)
if os.path.exists(the_path):
last_line = int(open(the_path, "r").readlines()[-1])
if raw:
best_walltime = last_line
else:
best_walltime = None
for cutoff, tolerance in _WALLTIME_TOLERANCE:
if last_line <= cutoff:
best_walltime = int(float(last_line) * tolerance)
break
if best_walltime < _GLOBAL_MINUMUM_TIME:
best_walltime = _GLOBAL_MINUMUM_TIME
best_walltime += _GLOBAL_WIGGLE
return convert_to_babylonian_time(best_walltime)
except Exception:
# We NEVER want a failure here to kill the run
logger.warning("Failed to read test time: {}".format(sys.exc_info()[1]))
return None
[docs]def save_test_time(baseline_root, test, time_seconds):
if baseline_root is not None:
try:
with SharedArea():
the_dir = os.path.join(baseline_root, _WALLTIME_BASELINE_NAME, test)
if not os.path.exists(the_dir):
os.makedirs(the_dir)
the_path = os.path.join(the_dir, _WALLTIME_FILE_NAME)
with open(the_path, "a") as fd:
fd.write("{}\n".format(int(time_seconds)))
except Exception:
# We NEVER want a failure here to kill the run
logger.warning("Failed to store test time: {}".format(sys.exc_info()[1]))
_SUCCESS_BASELINE_NAME = "success-history"
_SUCCESS_FILE_NAME = "last-transitions"
def _read_success_data(baseline_root, test):
success_path = os.path.join(baseline_root, _SUCCESS_BASELINE_NAME, test, _SUCCESS_FILE_NAME)
if os.path.exists(success_path):
with open(success_path, "r") as fd:
prev_results_raw = fd.read().strip()
prev_results = prev_results_raw.split()
expect(len(prev_results) == 2, "Bad success data: '{}'".format(prev_results_raw))
else:
prev_results = ["None", "None"]
# Convert "None" to None
for idx, item in enumerate(prev_results):
if item == "None":
prev_results[idx] = None
return success_path, prev_results
def _is_test_working(prev_results, src_root, testing=False):
# If there is no history of success, prev run could not have succeeded and vice versa for failures
if prev_results[0] is None:
return False
elif prev_results[1] is None:
return True
else:
if not testing:
stat, out, err = run_cmd("git merge-base --is-ancestor {}".format(" ".join(prev_results)), from_dir=src_root)
expect(stat in [0, 1], "Unexpected status from ancestor check:\n{}\n{}".format(out, err))
else:
# Hack for testing
stat = 0 if prev_results[0] < prev_results[1] else 1
# stat == 0 tells us that pass is older than fail, so we must have failed, otherwise we passed
return stat != 0
[docs]def get_test_success(baseline_root, src_root, test, testing=False):
"""
Returns (was prev run success, commit when test last passed, commit when test last transitioned from pass to fail)
Unknown history is expressed as None
"""
if baseline_root is not None:
try:
prev_results = _read_success_data(baseline_root, test)[1]
prev_success = _is_test_working(prev_results, src_root, testing=testing)
return prev_success, prev_results[0], prev_results[1]
except Exception:
# We NEVER want a failure here to kill the run
logger.warning("Failed to read test success: {}".format(sys.exc_info()[1]))
return False, None, None
[docs]def save_test_success(baseline_root, src_root, test, succeeded, force_commit_test=None):
"""
Update success data accordingly based on succeeded flag
"""
if baseline_root is not None:
try:
with SharedArea():
success_path, prev_results = _read_success_data(baseline_root, test)
the_dir = os.path.dirname(success_path)
if not os.path.exists(the_dir):
os.makedirs(the_dir)
prev_succeeded = _is_test_working(prev_results, src_root, testing=(force_commit_test is not None))
# if no transition occurred then no update is needed
if succeeded or succeeded != prev_succeeded or (prev_results[0] is None and succeeded) or (prev_results[1] is None and not succeeded):
new_results = list(prev_results)
my_commit = force_commit_test if force_commit_test else get_current_commit(repo=src_root)
if succeeded:
new_results[0] = my_commit # we passed
else:
new_results[1] = my_commit # we transitioned to a failing state
str_results = ["None" if item is None else item for item in new_results]
with open(success_path, "w") as fd:
fd.write("{}\n".format(" ".join(str_results)))
except Exception:
# We NEVER want a failure here to kill the run
logger.warning("Failed to store test success: {}".format(sys.exc_info()[1]))