"""
Interface to the env_workflow.xml file. This class inherits from EnvBase
"""
from CIME.XML.standard_module_setup import *
from CIME.XML.env_base import EnvBase
from CIME.utils import get_cime_root
import re, math
logger = logging.getLogger(__name__)
# pragma pylint: disable=attribute-defined-outside-init
[docs]
class EnvWorkflow(EnvBase):
def __init__(self, case_root=None, infile="env_workflow.xml", read_only=False):
"""
initialize an object interface to file env_workflow.xml in the case directory
"""
# This arbitrary setting should always be overwritten
# schema = os.path.join(get_cime_root(), "CIME", "config", "xml_schemas", "env_workflow.xsd")
# TODO: define schema for this file
schema = None
self._hidden = {}
super(EnvWorkflow, self).__init__(
case_root, infile, schema=schema, read_only=read_only
)
[docs]
def create_job_groups(self, batch_jobs, is_test):
# Subtle: in order to support dynamic batch jobs, we need to remove the
# job_submission group and replace with job-based groups
orig_group = self.get_optional_child(
"group",
{"id": "job_submission"},
err_msg="Looks like job groups have already been created",
)
expect(orig_group, "No workflow groups found")
orig_group_children = super(EnvWorkflow, self).get_children(root=orig_group)
childnodes = []
for child in reversed(orig_group_children):
childnodes.append(child)
self.remove_child(orig_group)
for name, jdict in batch_jobs:
if name == "case.run" and is_test:
pass # skip
elif name == "case.test" and not is_test:
pass # skip
elif name == "case.run.sh":
pass # skip
else:
new_job_group = self.make_child("group", {"id": name})
for field in jdict.keys():
if field == "runtime_parameters":
continue
val = jdict[field]
node = self.make_child(
"entry", {"id": field, "value": val}, root=new_job_group
)
self.make_child("type", root=node, text="char")
for child in childnodes:
self.add_child(self.copy(child), root=new_job_group)
[docs]
def get_jobs(self):
groups = self.get_children("group")
results = []
for group in groups:
results.append(self.get(group, "id"))
return results
[docs]
def get_type_info(self, vid):
gnodes = self.get_children("group")
type_info = None
for gnode in gnodes:
nodes = self.get_children("entry", {"id": vid}, root=gnode)
type_info = None
for node in nodes:
new_type_info = self._get_type_info(node)
if type_info is None:
type_info = new_type_info
else:
expect(
type_info == new_type_info,
"Inconsistent type_info for entry id={} {} {}".format(
vid, new_type_info, type_info
),
)
return type_info
[docs]
def hidden_job(self, case, job):
if job not in self._hidden:
self.get_job_specs(case, job)
return self._hidden[job]
[docs]
def get_job_specs(self, case, job):
hidden = self.get_value("hidden", subgroup=job)
self._hidden[job] = (hidden is None and job != "case.st_archive") or (
hidden is not None and hidden.lower() == "true"
)
task_count = case.get_resolved_value(self.get_value("task_count", subgroup=job))
tasks_per_node = case.get_resolved_value(
self.get_value("tasks_per_node", subgroup=job)
)
thread_count = case.get_resolved_value(
self.get_value("thread_count", subgroup=job)
)
max_gpus_per_node = case.get_value("MAX_GPUS_PER_NODE")
ngpus_per_node = case.get_value("NGPUS_PER_NODE")
num_nodes = None
if not ngpus_per_node:
max_gpus_per_node = 0
ngpus_per_node = 0
if task_count is not None and tasks_per_node is not None:
task_count = int(task_count)
num_nodes = int(math.ceil(float(task_count) / float(tasks_per_node)))
tasks_per_node = task_count // num_nodes
if not thread_count:
thread_count = 1
if ngpus_per_node > max_gpus_per_node:
ngpus_per_node = max_gpus_per_node
return (
task_count,
num_nodes,
tasks_per_node,
thread_count,
ngpus_per_node,
)
# pylint: disable=arguments-differ
[docs]
def get_value(self, item, attribute=None, resolved=True, subgroup="PRIMARY"):
"""
Must default subgroup to something in order to provide single return value
"""
value = None
if subgroup == "PRIMARY":
subgroup = "case.test" if "case.test" in self.get_jobs() else "case.run"
# pylint: disable=assignment-from-none
if value is None:
value = super(EnvWorkflow, self).get_value(
item, attribute=attribute, resolved=resolved, subgroup=subgroup
)
return value
# pylint: disable=arguments-differ
[docs]
def set_value(self, item, value, subgroup=None, ignore_type=False):
"""
Override the entry_id set_value function with some special cases for this class
"""
val = None
# allow the user to set item for all jobs if subgroup is not provided
if subgroup is None:
gnodes = self.get_children("group")
for gnode in gnodes:
node = self.get_optional_child("entry", {"id": item}, root=gnode)
if node is not None:
self._set_value(node, value, vid=item, ignore_type=ignore_type)
val = value
else:
group = self.get_optional_child("group", {"id": subgroup})
if group is not None:
node = self.get_optional_child("entry", {"id": item}, root=group)
if node is not None:
val = self._set_value(
node, value, vid=item, ignore_type=ignore_type
)
return val
[docs]
def get_children(self, name=None, attributes=None, root=None):
if name in (
"JOB_WALLCLOCK_TIME",
"PROJECT",
"CHARGE_ACCOUNT",
"JOB_QUEUE",
"BATCH_COMMAND_FLAGS",
):
nodes = super(EnvWorkflow, self).get_children(
"entry", attributes={"id": name}, root=root
)
else:
nodes = super(EnvWorkflow, self).get_children(
name, attributes=attributes, root=root
)
return nodes