Source code for CIME.XML.env_workflow

"""
Interface to the env_workflow.xml file.  This class inherits from EnvBase
"""

from CIME.XML.standard_module_setup import *
from CIME.XML.env_base import EnvBase
from CIME.utils import get_cime_root

import re, math

logger = logging.getLogger(__name__)

# pragma pylint: disable=attribute-defined-outside-init


[docs] class EnvWorkflow(EnvBase): def __init__(self, case_root=None, infile="env_workflow.xml", read_only=False): """ initialize an object interface to file env_workflow.xml in the case directory """ # This arbitrary setting should always be overwritten # schema = os.path.join(get_cime_root(), "CIME", "config", "xml_schemas", "env_workflow.xsd") # TODO: define schema for this file schema = None self._hidden = {} super(EnvWorkflow, self).__init__( case_root, infile, schema=schema, read_only=read_only )
[docs] def create_job_groups(self, batch_jobs, is_test): # Subtle: in order to support dynamic batch jobs, we need to remove the # job_submission group and replace with job-based groups orig_group = self.get_optional_child( "group", {"id": "job_submission"}, err_msg="Looks like job groups have already been created", ) expect(orig_group, "No workflow groups found") orig_group_children = super(EnvWorkflow, self).get_children(root=orig_group) childnodes = [] for child in reversed(orig_group_children): childnodes.append(child) self.remove_child(orig_group) for name, jdict in batch_jobs: if name == "case.run" and is_test: pass # skip elif name == "case.test" and not is_test: pass # skip elif name == "case.run.sh": pass # skip else: new_job_group = self.make_child("group", {"id": name}) for field in jdict.keys(): if field == "runtime_parameters": continue val = jdict[field] node = self.make_child( "entry", {"id": field, "value": val}, root=new_job_group ) self.make_child("type", root=node, text="char") for child in childnodes: self.add_child(self.copy(child), root=new_job_group)
[docs] def get_jobs(self): groups = self.get_children("group") results = [] for group in groups: results.append(self.get(group, "id")) return results
[docs] def get_type_info(self, vid): gnodes = self.get_children("group") type_info = None for gnode in gnodes: nodes = self.get_children("entry", {"id": vid}, root=gnode) type_info = None for node in nodes: new_type_info = self._get_type_info(node) if type_info is None: type_info = new_type_info else: expect( type_info == new_type_info, "Inconsistent type_info for entry id={} {} {}".format( vid, new_type_info, type_info ), ) return type_info
[docs] def hidden_job(self, case, job): if job not in self._hidden: self.get_job_specs(case, job) return self._hidden[job]
[docs] def get_job_specs(self, case, job): hidden = self.get_value("hidden", subgroup=job) self._hidden[job] = (hidden is None and job != "case.st_archive") or ( hidden is not None and hidden.lower() == "true" ) task_count = case.get_resolved_value(self.get_value("task_count", subgroup=job)) tasks_per_node = case.get_resolved_value( self.get_value("tasks_per_node", subgroup=job) ) thread_count = case.get_resolved_value( self.get_value("thread_count", subgroup=job) ) max_gpus_per_node = case.get_value("MAX_GPUS_PER_NODE") ngpus_per_node = case.get_value("NGPUS_PER_NODE") num_nodes = None if not ngpus_per_node: max_gpus_per_node = 0 ngpus_per_node = 0 if task_count is not None and tasks_per_node is not None: task_count = int(task_count) num_nodes = int(math.ceil(float(task_count) / float(tasks_per_node))) tasks_per_node = task_count // num_nodes if not thread_count: thread_count = 1 if ngpus_per_node > max_gpus_per_node: ngpus_per_node = max_gpus_per_node return ( task_count, num_nodes, tasks_per_node, thread_count, ngpus_per_node, )
# pylint: disable=arguments-differ
[docs] def get_value(self, item, attribute=None, resolved=True, subgroup="PRIMARY"): """ Must default subgroup to something in order to provide single return value """ value = None if subgroup == "PRIMARY": subgroup = "case.test" if "case.test" in self.get_jobs() else "case.run" # pylint: disable=assignment-from-none if value is None: value = super(EnvWorkflow, self).get_value( item, attribute=attribute, resolved=resolved, subgroup=subgroup ) return value
# pylint: disable=arguments-differ
[docs] def set_value(self, item, value, subgroup=None, ignore_type=False): """ Override the entry_id set_value function with some special cases for this class """ val = None # allow the user to set item for all jobs if subgroup is not provided if subgroup is None: gnodes = self.get_children("group") for gnode in gnodes: node = self.get_optional_child("entry", {"id": item}, root=gnode) if node is not None: self._set_value(node, value, vid=item, ignore_type=ignore_type) val = value else: group = self.get_optional_child("group", {"id": subgroup}) if group is not None: node = self.get_optional_child("entry", {"id": item}, root=group) if node is not None: val = self._set_value( node, value, vid=item, ignore_type=ignore_type ) return val
[docs] def get_children(self, name=None, attributes=None, root=None): if name in ( "JOB_WALLCLOCK_TIME", "PROJECT", "CHARGE_ACCOUNT", "JOB_QUEUE", "BATCH_COMMAND_FLAGS", ): nodes = super(EnvWorkflow, self).get_children( "entry", attributes={"id": name}, root=root ) else: nodes = super(EnvWorkflow, self).get_children( name, attributes=attributes, root=root ) return nodes