"""
Interface to the env_workflow.xml file. This class inherits from EnvBase
"""
from CIME.XML.standard_module_setup import *
from CIME.XML.env_base import EnvBase
from CIME.utils import get_cime_root
import re, math
logger = logging.getLogger(__name__)
# pragma pylint: disable=attribute-defined-outside-init
[docs]class EnvWorkflow(EnvBase):
def __init__(self, case_root=None, infile="env_workflow.xml", read_only=False):
"""
initialize an object interface to file env_workflow.xml in the case directory
"""
# This arbitrary setting should always be overwritten
# schema = os.path.join(get_cime_root(), "config", "xml_schemas", "env_workflow.xsd")
# TODO: define schema for this file
schema = None
super(EnvWorkflow,self).__init__(case_root, infile, schema=schema, read_only=read_only)
[docs] def create_job_groups(self, batch_jobs, is_test):
# Subtle: in order to support dynamic batch jobs, we need to remove the
# job_submission group and replace with job-based groups
orig_group = self.get_optional_child("group", {"id":"job_submission"},
err_msg="Looks like job groups have already been created")
expect(orig_group, "No workflow groups found")
orig_group_children = super(EnvWorkflow, self).get_children(root=orig_group)
childnodes = []
for child in reversed(orig_group_children):
childnodes.append(child)
self.remove_child(orig_group)
for name, jdict in batch_jobs:
if name == "case.run" and is_test:
pass # skip
elif name == "case.test" and not is_test:
pass # skip
elif name == "case.run.sh":
pass # skip
else:
new_job_group = self.make_child("group", {"id":name})
for field in jdict.keys():
if field == "runtime_parameters":
continue
val = jdict[field]
node = self.make_child("entry", {"id":field,"value":val}, root=new_job_group)
self.make_child("type", root=node, text="char")
for child in childnodes:
self.add_child(self.copy(child), root=new_job_group)
[docs] def get_jobs(self):
groups = self.get_children("group")
results = []
for group in groups:
results.append(self.get(group, "id"))
return results
[docs] def get_type_info(self, vid):
gnodes = self.get_children("group")
type_info = None
for gnode in gnodes:
nodes = self.get_children("entry",{"id":vid}, root=gnode)
type_info = None
for node in nodes:
new_type_info = self._get_type_info(node)
if type_info is None:
type_info = new_type_info
else:
expect( type_info == new_type_info,
"Inconsistent type_info for entry id={} {} {}".format(vid, new_type_info, type_info))
return type_info
[docs] def get_job_specs(self, case, job):
task_count = case.get_resolved_value(self.get_value("task_count", subgroup=job))
tasks_per_node = case.get_resolved_value(self.get_value("tasks_per_node", subgroup=job))
thread_count = case.get_resolved_value(self.get_value("thread_count", subgroup=job))
num_nodes = None
if task_count is not None and tasks_per_node is not None:
task_count = int(task_count)
num_nodes = int(math.ceil(float(task_count)/float(tasks_per_node)))
tasks_per_node = task_count//num_nodes
if not thread_count:
thread_count = 1
return task_count, num_nodes, tasks_per_node, thread_count
# pylint: disable=arguments-differ
[docs] def get_value(self, item, attribute=None, resolved=True, subgroup="PRIMARY"):
"""
Must default subgroup to something in order to provide single return value
"""
value = None
if subgroup == "PRIMARY":
subgroup = "case.test" if "case.test" in self.get_jobs() else "case.run"
#pylint: disable=assignment-from-none
if value is None:
value = super(EnvWorkflow, self).get_value(item, attribute=attribute, resolved=resolved, subgroup=subgroup)
return value
# pylint: disable=arguments-differ
[docs] def set_value(self, item, value, subgroup=None, ignore_type=False):
"""
Override the entry_id set_value function with some special cases for this class
"""
val = None
# allow the user to set item for all jobs if subgroup is not provided
if subgroup is None:
gnodes = self.get_children("group")
for gnode in gnodes:
node = self.get_optional_child("entry", {"id":item}, root=gnode)
if node is not None:
self._set_value(node, value, vid=item, ignore_type=ignore_type)
val = value
else:
group = self.get_optional_child("group", {"id":subgroup})
if group is not None:
node = self.get_optional_child("entry", {"id":item}, root=group)
if node is not None:
val = self._set_value(node, value, vid=item, ignore_type=ignore_type)
return val
[docs] def get_children(self, name=None, attributes=None, root=None):
if name in ("JOB_WALLCLOCK_TIME", "PROJECT", "CHARGE_ACCOUNT",
"JOB_QUEUE", "BATCH_COMMAND_FLAGS"):
nodes = super(EnvWorkflow, self).get_children("entry", attributes={"id":name}, root=root)
else:
nodes = super(EnvWorkflow, self).get_children(name, attributes=attributes, root=root)
return nodes