import os
import glob
import re
import gzip
import logging
from CIME.config import Config
from CIME.utils import expect, get_src_root, get_current_commit, get_timestamp
logger = logging.getLogger(__name__)
[docs]
def perf_compare_throughput_baseline(case, baseline_dir=None):
"""
Compares model throughput.
Parameters
----------
case : CIME.case.case.Case
Current case object.
baseline_dir : str
Overrides the baseline directory.
Returns
-------
below_tolerance : bool
Whether the comparison was below the tolerance.
comment : str
Provides explanation from comparison.
"""
if baseline_dir is None:
baseline_dir = case.get_baseline_dir()
config = load_coupler_customization(case)
baseline_file = os.path.join(baseline_dir, "cpl-tput.log")
baseline = read_baseline_file(baseline_file)
tolerance = case.get_value("TEST_TPUT_TOLERANCE")
if tolerance is None:
tolerance = 0.1
expect(
tolerance > 0.0,
"Bad value for throughput tolerance in test",
)
try:
below_tolerance, comment = config.perf_compare_throughput_baseline(
case, baseline, tolerance
)
except AttributeError:
below_tolerance, comment = _perf_compare_throughput_baseline(
case, baseline, tolerance
)
return below_tolerance, comment
[docs]
def perf_compare_memory_baseline(case, baseline_dir=None):
"""
Compares model highwater memory usage.
Parameters
----------
case : CIME.case.case.Case
Current case object.
baseline_dir : str
Overrides the baseline directory.
Returns
-------
below_tolerance : bool
Whether the comparison was below the tolerance.
comment : str
Provides explanation from comparison.
"""
if baseline_dir is None:
baseline_dir = case.get_baseline_dir()
config = load_coupler_customization(case)
baseline_file = os.path.join(baseline_dir, "cpl-mem.log")
baseline = read_baseline_file(baseline_file)
tolerance = case.get_value("TEST_MEMLEAK_TOLERANCE")
if tolerance is None:
tolerance = 0.1
try:
below_tolerance, comments = config.perf_compare_memory_baseline(
case, baseline, tolerance
)
except AttributeError:
below_tolerance, comments = _perf_compare_memory_baseline(
case, baseline, tolerance
)
return below_tolerance, comments
[docs]
def perf_write_baseline(case, basegen_dir, throughput=True, memory=True):
"""
Writes the baseline performance files.
Parameters
----------
case : CIME.case.case.Case
Current case object.
basegen_dir : str
Path to baseline directory.
throughput : bool
If true, write throughput baseline.
memory : bool
If true, write memory baseline.
"""
config = load_coupler_customization(case)
if throughput:
try:
tput, mode = perf_get_throughput(case, config)
except RuntimeError as e:
logger.debug("Could not get throughput: {0!s}".format(e))
else:
baseline_file = os.path.join(basegen_dir, "cpl-tput.log")
write_baseline_file(baseline_file, tput, mode)
logger.info("Updated throughput baseline to {!s}".format(tput))
if memory:
try:
mem, mode = perf_get_memory(case, config)
except RuntimeError as e:
logger.info("Could not get memory usage: {0!s}".format(e))
else:
baseline_file = os.path.join(basegen_dir, "cpl-mem.log")
write_baseline_file(baseline_file, mem, mode)
logger.info("Updated memory usage baseline to {!s}".format(mem))
[docs]
def load_coupler_customization(case):
"""
Loads customizations from the coupler `cime_config` directory.
Parameters
----------
case : CIME.case.case.Case
Current case object.
Returns
-------
CIME.config.Config
Runtime configuration.
"""
comp_root_dir_cpl = case.get_value("COMP_ROOT_DIR_CPL")
cpl_customize = os.path.join(comp_root_dir_cpl, "cime_config", "customize")
return Config.load(cpl_customize)
[docs]
def perf_get_throughput(case, config):
"""
Gets the model throughput.
First attempts to use a coupler define method to retrieve the
models throughput. If this is not defined then the default
method of parsing the coupler log is used.
Parameters
----------
case : CIME.case.case.Case
Current case object.
Returns
-------
str or None
Model throughput.
"""
try:
tput, mode = config.perf_get_throughput(case)
except AttributeError:
tput, mode = _perf_get_throughput(case)
return tput, mode
[docs]
def perf_get_memory(case, config):
"""
Gets the model memory usage.
First attempts to use a coupler defined method to retrieve the
models memory usage. If this is not defined then the default
method of parsing the coupler log is used.
Parameters
----------
case : CIME.case.case.Case
Current case object.
Returns
-------
str or None
Model memory usage.
"""
try:
mem, mode = config.perf_get_memory(case)
except AttributeError:
mem, mode = _perf_get_memory(case)
return mem, mode
[docs]
def write_baseline_file(baseline_file, value, mode="a"):
"""
Writes value to `baseline_file`.
Parameters
----------
baseline_file : str
Path to the baseline file.
value : str
Value to write.
mode : str
Mode to open file with.
"""
with open(baseline_file, mode) as fd:
fd.write(value)
def _perf_get_memory(case, cpllog=None):
"""
Default function to retrieve memory usage from the coupler log.
If the usage is not available from the log then `None` is returned.
Parameters
----------
case : CIME.case.case.Case
Current case object.
cpllog : str
Overrides the default coupler log.
Returns
-------
str or None
Model memory usage or `None`.
Raises
------
RuntimeError
If not enough sample were found.
"""
memlist = perf_get_memory_list(case, cpllog)
if memlist is None:
raise RuntimeError("Could not get default memory usage") from None
value = _format_baseline(memlist[-1][1])
return value, "a"
[docs]
def perf_get_memory_list(case, cpllog):
if cpllog is None:
cpllog = get_latest_cpl_logs(case)
else:
cpllog = [
cpllog,
]
try:
memlist = get_cpl_mem_usage(cpllog[0])
except (FileNotFoundError, IndexError):
memlist = None
logger.debug("Could not parse memory usage from coupler log")
else:
if len(memlist) <= 3:
raise RuntimeError(
f"Found {len(memlist)} memory usage samples, need atleast 4"
)
return memlist
def _perf_get_throughput(case):
"""
Default function to retrieve throughput from the coupler log.
If the throughput is not available from the log then `None` is returned.
Parameters
----------
case : CIME.case.case.Case
Current case object.
Returns
-------
str or None
Model throughput or `None`.
"""
cpllog = get_latest_cpl_logs(case)
try:
tput = get_cpl_throughput(cpllog[0])
except (FileNotFoundError, IndexError):
tput = None
logger.debug("Could not parse throughput from coupler log")
if tput is None:
raise RuntimeError("Could not get default throughput") from None
value = _format_baseline(tput)
return value, "a"
[docs]
def get_latest_cpl_logs(case):
"""
find and return the latest cpl log file in the run directory
"""
coupler_log_path = case.get_value("RUNDIR")
cpllog_name = "med" if case.get_value("COMP_INTERFACE") == "nuopc" else "cpl"
cpllogs = glob.glob(os.path.join(coupler_log_path, "{}*.log.*".format(cpllog_name)))
lastcpllogs = []
if cpllogs:
lastcpllogs.append(max(cpllogs, key=os.path.getctime))
basename = os.path.basename(lastcpllogs[0])
suffix = basename.split(".", 1)[1]
for log in cpllogs:
if log in lastcpllogs:
continue
if log.endswith(suffix):
lastcpllogs.append(log)
return lastcpllogs
[docs]
def get_cpl_mem_usage(cpllog):
"""
Read memory usage from coupler log.
Parameters
----------
cpllog : str
Path to the coupler log.
Returns
-------
list
Memory usage (data, highwater) as recorded by the coupler or empty list.
"""
memlist = []
meminfo = re.compile(r".*model date =\s+(\w+).*memory =\s+(\d+\.?\d+).*highwater")
if cpllog is not None and os.path.isfile(cpllog):
if ".gz" == cpllog[-3:]:
fopen = gzip.open
else:
fopen = open
with fopen(cpllog, "rb") as f:
for line in f:
m = meminfo.match(line.decode("utf-8"))
if m:
memlist.append((float(m.group(1)), float(m.group(2))))
# Remove the last mem record, it's sometimes artificially high
if len(memlist) > 0:
memlist.pop()
return memlist
[docs]
def get_cpl_throughput(cpllog):
"""
Reads throuhgput from coupler log.
Parameters
----------
cpllog : str
Path to the coupler log.
Returns
-------
int or None
Throughput as recorded by the coupler or None
"""
if cpllog is not None and os.path.isfile(cpllog):
with gzip.open(cpllog, "rb") as f:
cpltext = f.read().decode("utf-8")
m = re.search(r"# simulated years / cmp-day =\s+(\d+\.\d+)\s", cpltext)
if m:
return float(m.group(1))
return None
[docs]
def read_baseline_file(baseline_file):
"""
Reads value from `baseline_file`.
Strips comments and returns the raw content to be decoded.
Parameters
----------
baseline_file : str
Path to the baseline file.
Returns
-------
str
Value stored in baseline file without comments.
"""
if not os.path.exists(baseline_file):
return "\nNO file {} found".format(baseline_file)
with open(baseline_file) as fd:
lines = [x.strip() for x in fd.readlines() if not x.startswith("#") and x != ""]
return "\n".join(lines)
def _perf_compare_throughput_baseline(case, baseline, tolerance):
"""
Default throughput baseline comparison.
Compares the throughput from the coupler to the baseline value.
Parameters
----------
case : CIME.case.case.Case
Current case object.
baseline : list
Lines contained in the baseline file.
tolerance : float
Allowed tolerance for comparison.
Returns
-------
below_tolerance : bool
Whether the comparison was below the tolerance.
comment : str
provides explanation from comparison.
"""
current, _ = _perf_get_throughput(case)
try:
current = float(_parse_baseline(current))
except (ValueError, TypeError):
comment = "Could not compare throughput to baseline, as baseline had no value."
return None, comment
try:
# default baseline is stored as single float
baseline = float(_parse_baseline(baseline))
except (ValueError, TypeError):
comment = "Could not compare throughput to baseline, as baseline had no value."
return None, comment
# comparing ypd so bigger is better
diff = (baseline - current) / baseline
below_tolerance = None
if diff is not None:
below_tolerance = diff < tolerance
info = "Throughput changed by {:.2f}%: baseline={:.3f} sypd, tolerance={:d}%, current={:.3f} sypd".format(
diff * 100, baseline, int(tolerance * 100), current
)
if below_tolerance:
comment = "TPUTCOMP: " + info
else:
comment = "Error: TPUTCOMP: " + info
return below_tolerance, comment
def _perf_compare_memory_baseline(case, baseline, tolerance):
"""
Default memory usage baseline comparison.
Compares the highwater memory usage from the coupler to the baseline value.
Parameters
----------
case : CIME.case.case.Case
Current case object.
baseline : list
Lines contained in the baseline file.
tolerance : float
Allowed tolerance for comparison.
Returns
-------
below_tolerance : bool
Whether the comparison was below the tolerance.
comment : str
provides explanation from comparison.
"""
try:
current, _ = _perf_get_memory(case)
except RuntimeError as e:
return None, str(e)
try:
current = float(_parse_baseline(current))
except (ValueError, TypeError):
comment = "Could not compare throughput to baseline, as baseline had no value."
return None, comment
try:
# default baseline is stored as single float
baseline = float(_parse_baseline(baseline))
except (ValueError, TypeError):
baseline = 0.0
try:
diff = (current - baseline) / baseline
except ZeroDivisionError:
diff = 0.0
# Should we check if tolerance is above 0
below_tolerance = None
comment = ""
if diff is not None:
below_tolerance = diff < tolerance
info = "Memory usage highwater changed by {:.2f}%: baseline={:.3f} MB, tolerance={:d}%, current={:.3f} MB".format(
diff * 100, baseline, int(tolerance * 100), current
)
if below_tolerance:
comment = "MEMCOMP: " + info
else:
comment = "Error: MEMCOMP: " + info
return below_tolerance, comment
def _format_baseline(value):
"""
Encodes value with default baseline format.
Default format:
sha: <commit sha> date: <date of bless> <value>
Parameters
----------
value : str
Baseline value to encode.
Returns
-------
value : str
Baseline entry.
"""
commit_hash = get_current_commit(repo=get_src_root())
timestamp = get_timestamp(timestamp_format="%Y-%m-%d_%H:%M:%S")
return f"sha:{commit_hash} date:{timestamp} {value}\n"
def _parse_baseline(data):
"""
Parses default baseline format.
Default format:
sha: <commit sha> date: <date of bless> <value>
Parameters
----------
data : str
Containing contents of baseline file.
Returns
-------
value : str
Value of the latest blessed baseline.
"""
lines = data.split("\n")
lines = [x for x in lines if x != ""]
try:
value = lines[-1].strip().split(" ")[-1]
except IndexError:
value = None
return value