Source code for CIME.XML.env_batch

"""
Interface to the env_batch.xml file.  This class inherits from EnvBase
"""

from CIME.XML.standard_module_setup import *
from CIME.XML.env_base import EnvBase
from CIME.utils import transform_vars, get_cime_root, convert_to_seconds, get_cime_config, get_batch_script_for_job, get_logging_options

from collections import OrderedDict
import stat, re, math

logger = logging.getLogger(__name__)

# pragma pylint: disable=attribute-defined-outside-init

[docs]class EnvBatch(EnvBase): def __init__(self, case_root=None, infile="env_batch.xml", schema=None): """ initialize an object interface to file env_batch.xml in the case directory """ self._batchtype = None # This arbitrary setting should always be overwritten self._default_walltime = "00:20:00" if not schema: schema = os.path.join(get_cime_root(), "config", "xml_schemas", "env_batch.xsd") super(EnvBatch,self).__init__(case_root, infile, schema=schema) # pylint: disable=arguments-differ
[docs] def set_value(self, item, value, subgroup=None, ignore_type=False): """ Override the entry_id set_value function with some special cases for this class """ val = None if item == "JOB_QUEUE": expect(value in self._get_all_queue_names() or ignore_type, "Unknown Job Queue specified use --force to set") # allow the user to set item for all jobs if subgroup is not provided if subgroup is None: gnodes = self.get_children("group") for gnode in gnodes: node = self.get_optional_child("entry", {"id":item}, root=gnode) if node is not None: self._set_value(node, value, vid=item, ignore_type=ignore_type) val = value else: group = self.get_optional_child("group", {"id":subgroup}) if group is not None: node = self.get_optional_child("entry", {"id":item}, root=group) if node is not None: val = self._set_value(node, value, vid=item, ignore_type=ignore_type) return val
# pylint: disable=arguments-differ
[docs] def get_value(self, item, attribute=None, resolved=True, subgroup=None): """ Must default subgroup to something in order to provide single return value """ value = None node = self.get_optional_child(item, attribute) if node is None: # this will take the last instance of item listed in all batch_system elements bs_nodes = self.get_children("batch_system") for bsnode in bs_nodes: cnode = self.get_optional_child(item, attribute, root=bsnode) if cnode is not None: node = cnode if node is None or item in ("BATCH_SYSTEM", "PROJECT_REQUIRED"): value = super(EnvBatch, self).get_value(item,attribute,resolved) else: value = self.text(node) if resolved: value = self.get_resolved_value(value) return value
[docs] def get_type_info(self, vid): gnodes = self.get_children("group") for gnode in gnodes: nodes = self.get_children("entry",{"id":vid}, root=gnode) type_info = None for node in nodes: new_type_info = self._get_type_info(node) if type_info is None: type_info = new_type_info else: expect( type_info == new_type_info, "Inconsistent type_info for entry id={} {} {}".format(vid, new_type_info, type_info)) return type_info
[docs] def get_jobs(self): groups = self.get_children("group") results = [] for group in groups: if self.get(group, "id") not in ["job_submission", "config_batch"]: results.append(self.get(group, "id")) return results
[docs] def create_job_groups(self, batch_jobs, is_test): # Subtle: in order to support dynamic batch jobs, we need to remove the # job_submission group and replace with job-based groups orig_group = self.get_child("group", {"id":"job_submission"}, err_msg="Looks like job groups have already been created") orig_group_children = super(EnvBatch, self).get_children(root=orig_group) childnodes = [] for child in reversed(orig_group_children): childnodes.append(child) self.remove_child(orig_group) for name, jdict in batch_jobs: if name == "case.run" and is_test: pass # skip elif name == "case.test" and not is_test: pass # skip elif name == "case.run.sh": pass # skip else: new_job_group = self.make_child("group", {"id":name}) for field in jdict.keys(): val = jdict[field] node = self.make_child("entry", {"id":field,"value":val}, root=new_job_group) self.make_child("type", root=node, text="char") for child in childnodes: self.add_child(self.copy(child), root=new_job_group)
[docs] def cleanupnode(self, node): if self.get(node, "id") == "batch_system": fnode = self.get_child(name="file", root=node) self.remove_child(fnode, root=node) gnode = self.get_child(name="group", root=node) self.remove_child(gnode, root=node) vnode = self.get_optional_child(name="values", root=node) if vnode is not None: self.remove_child(vnode, root=node) else: node = super(EnvBatch, self).cleanupnode(node) return node
[docs] def set_batch_system(self, batchobj, batch_system_type=None): if batch_system_type is not None: self.set_batch_system_type(batch_system_type) if batchobj.batch_system_node is not None and batchobj.machine_node is not None: for node in batchobj.get_children("",root=batchobj.machine_node): name = self.name(node) if name != 'directives': oldnode = batchobj.get_optional_child(name, root=batchobj.batch_system_node) if oldnode is not None: logger.debug( "Replacing {}".format(self.name(oldnode))) batchobj.remove_child(oldnode, root=batchobj.batch_system_node) if batchobj.batch_system_node is not None: self.add_child(self.copy(batchobj.batch_system_node)) if batchobj.machine_node is not None: self.add_child(self.copy(batchobj.machine_node)) self.set_value("BATCH_SYSTEM", batch_system_type)
[docs] def get_job_overrides(self, job, case): env_workflow = case.get_env('workflow') total_tasks, num_nodes, tasks_per_node, thread_count = env_workflow.get_job_specs(job) overrides = {} if total_tasks: overrides["total_tasks"] = total_tasks overrides["num_nodes"] = num_nodes overrides["tasks_per_node"] = tasks_per_node if thread_count: overrides["thread_count"] = thread_count else: total_tasks = case.get_value("TOTALPES")*int(case.thread_count) thread_count = case.thread_count if int(total_tasks)*int(thread_count) < case.get_value("MAX_TASKS_PER_NODE"): overrides["max_tasks_per_node"] = int(total_tasks) overrides["mpirun"] = case.get_mpirun_cmd(job=job, overrides=overrides) return overrides
[docs] def make_batch_script(self, input_template, job, case, outfile=None): expect(os.path.exists(input_template), "input file '{}' does not exist".format(input_template)) overrides = self.get_job_overrides(job, case) ext = os.path.splitext(job)[-1] if len(ext) == 0: ext = job if ext.startswith('.'): ext = ext[1:] overrides["job_id"] = ext + '.' + case.get_value("CASE") if "pleiades" in case.get_value("MACH"): # pleiades jobname needs to be limited to 15 chars overrides["job_id"] = overrides["job_id"][:15] overrides["batchdirectives"] = self.get_batch_directives(case, job, overrides=overrides) output_text = transform_vars(open(input_template,"r").read(), case=case, subgroup=job, overrides=overrides) output_name = get_batch_script_for_job(job) if outfile is None else outfile logger.info("Creating file {}".format(output_name)) with open(output_name, "w") as fd: fd.write(output_text) # make sure batch script is exectuble os.chmod(output_name, os.stat(output_name).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
[docs] def set_job_defaults(self, batch_jobs, case): if self._batchtype is None: self._batchtype = self.get_batch_system_type() if self._batchtype == "none": return env_workflow = case.get_env('workflow') known_jobs = env_workflow.get_jobs() for job, jsect in batch_jobs: if job not in known_jobs: continue walltime = case.get_value("USER_REQUESTED_WALLTIME", subgroup=job) if case.get_value("USER_REQUESTED_WALLTIME", subgroup=job) else None force_queue = case.get_value("USER_REQUESTED_QUEUE", subgroup=job) if case.get_value("USER_REQUESTED_QUEUE", subgroup=job) else None walltime_format = case.get_value("walltime_format", subgroup=job) if case.get_value("walltime_format", subgroup=job) else None logger.info("job is {} USER_REQUESTED_WALLTIME {} USER_REQUESTED_QUEUE {} WALLTIME_FORMAT {}".format(job, walltime, force_queue, walltime_format)) task_count = int(jsect["task_count"]) if "task_count" in jsect else case.total_tasks walltime = jsect["walltime"] if ("walltime" in jsect and walltime is None) else walltime if "task_count" in jsect: # job is using custom task_count, need to compute a node_count based on this node_count = int(math.ceil(float(task_count)/float(case.tasks_per_node))) else: node_count = case.num_nodes if force_queue: if not self.queue_meets_spec(force_queue, node_count, task_count, walltime=walltime, job=job): logger.warning("WARNING: User-requested queue '{}' does not meet requirements for job '{}'".format(force_queue, job)) if self.queue_meets_spec(force_queue, node_count, task_count, walltime=None, job=job): if case.get_value("TEST"): walltime = self.get_queue_specs(force_queue)[3] logger.warning(" Using walltime '{}' instead".format(walltime)) else: logger.warning(" Continuing with suspect walltime, batch submission may fail") queue = force_queue else: queue = self.select_best_queue(node_count, task_count, walltime=walltime, job=job) if queue is None and walltime is not None: # Try to see if walltime was the holdup queue = self.select_best_queue(node_count, task_count, walltime=None, job=job) if queue is not None: # It was, override the walltime if a test, otherwise just warn the user new_walltime = self.get_queue_specs(queue)[3] expect(new_walltime is not None, "Should never make it here") logger.warning("WARNING: Requested walltime '{}' could not be matched by any queue".format(walltime)) if case.get_value("TEST"): logger.warning(" Using walltime '{}' instead".format(new_walltime)) walltime = new_walltime else: logger.warning(" Continuing with suspect walltime, batch submission may fail") if queue is None: logger.warning("WARNING: No queue on this system met the requirements for this job. Falling back to defaults") default_queue_node = self.get_default_queue() queue = self.text(default_queue_node) walltime = self.get_queue_specs(queue)[3] specs = self.get_queue_specs(queue) if walltime is None: # Figure out walltime if specs is None: # Queue is unknown, use specs from default queue walltime = self.get(self.get_default_queue(), "walltimemax") else: walltime = specs[3] walltime = self._default_walltime if walltime is None else walltime # last-chance fallback else: # Set the walltime to the correct walltime_format, if set. # Assuming correct format is #H:#M or %H:%M:%S. if walltime_format is not None: components=walltime.split(":") if len(components) > len(walltime_format.split(":")): walltime = ':'.join(components[:len(walltime_format.split(":"))]) logger.info(" Changing USER_REQUESTED_WALLTIME to {} to match walltime_format {}".format(walltime,walltime_format)) if len(components) < len(walltime_format.split(":")): walltime = walltime + ':' + ':'.join(["00"]*(len(walltime_format.split(":")) - len(components))) logger.info(" Changing USER_REQUESTED_WALLTIME to {} to match walltime_format {}".format(walltime,walltime_format)) env_workflow.set_value("JOB_QUEUE", queue, subgroup=job, ignore_type=specs is None) env_workflow.set_value("JOB_WALLCLOCK_TIME", walltime, subgroup=job) logger.debug("Job {} queue {} walltime {}".format(job, queue, walltime))
def _match_attribs(self, attribs, case, queue): # check for matches with case-vars for attrib in attribs: if attrib in ["default", "prefix"]: # These are not used for matching continue elif attrib == "queue": if not self._match(queue, attribs["queue"]): return False else: val = case.get_value(attrib.upper()) expect(val is not None, "Cannot match attrib '%s', case has no value for it" % attrib.upper()) if not self._match(val, attribs[attrib]): return False return True def _match(self, my_value, xml_value): if xml_value.startswith("!"): result = re.match(xml_value[1:],str(my_value)) is None elif isinstance(my_value, bool): if my_value: result = xml_value == "TRUE" else: result = xml_value == "FALSE" else: result = re.match(xml_value,str(my_value)) is not None logger.debug("(env_mach_specific) _match {} {} {}".format(my_value, xml_value, result)) return result
[docs] def get_batch_directives(self, case, job, overrides=None, output_format='default'): """ """ result = [] directive_prefix = None roots = self.get_children("batch_system") queue = self.get_value("JOB_QUEUE", subgroup=job) if self._batchtype != "none" and not queue in self._get_all_queue_names(): unknown_queue = True qnode = self.get_default_queue() default_queue = self.text(qnode) else: unknown_queue = False for root in roots: if root is not None: if directive_prefix is None: if output_format == 'default': directive_prefix = self.get_element_text("batch_directive", root=root) elif output_format == 'cylc': directive_prefix = " " if unknown_queue: unknown_queue_directives = self.get_element_text("unknown_queue_directives", root=root) if unknown_queue_directives is None: queue = default_queue else: queue = unknown_queue_directives dnodes = self.get_children("directives", root=root) for dnode in dnodes: nodes = self.get_children("directive", root=dnode) if self._match_attribs(self.attrib(dnode), case, queue): for node in nodes: directive = self.get_resolved_value("" if self.text(node) is None else self.text(node)) if output_format == 'cylc': if self._batchtype == 'pbs': # cylc includes the -N itself, no need to add if directive.startswith("-N"): directive='' continue m = re.match(r'\s*(-[\w])', directive) if m: directive = re.sub(r'(-[\w]) ','{} = '.format(m.group(1)), directive) default = self.get(node, "default") if default is None: directive = transform_vars(directive, case=case, subgroup=job, default=default, overrides=overrides) else: directive = transform_vars(directive, default=default) custom_prefix = self.get(node, "prefix") prefix = directive_prefix if custom_prefix is None else custom_prefix result.append("{}{}".format("" if not prefix else (prefix + " "), directive)) return "\n".join(result)
[docs] def get_submit_args(self, case, job): ''' return a list of touples (flag, name) ''' submitargs = " " bs_nodes = self.get_children("batch_system") submit_arg_nodes = [] for node in bs_nodes: sanode = self.get_optional_child("submit_args", root=node) if sanode is not None: submit_arg_nodes += self.get_children("arg",root=sanode) for arg in submit_arg_nodes: flag = self.get(arg, "flag") name = self.get(arg, "name") if self._batchtype == "cobalt" and job == "case.st_archive": if flag == "-n": name = 'task_count' if flag == "--mode": continue if name is None: submitargs+=" {}".format(flag) else: if name.startswith("$"): name = name[1:] if '$' in name: # We have a complex expression and must rely on get_resolved_value. # Hopefully, none of the values require subgroup val = case.get_resolved_value(name) else: val = case.get_value(name, subgroup=job) if val is not None and len(str(val)) > 0 and val != "None": # Try to evaluate val if it contains any whitespace if " " in val: try: rval = eval(val) except: rval = val else: rval = val # need a correction for tasks per node if flag == "-n" and rval<= 0: rval = 1 if flag == "-q" and rval == "batch" and case.get_value("MACH") == "blues": # Special case. Do not provide '-q batch' for blues continue if flag.rfind("=", len(flag)-1, len(flag)) >= 0 or\ flag.rfind(":", len(flag)-1, len(flag)) >= 0: submitargs+=" {}{}".format(flag,str(rval).strip()) else: submitargs+=" {} {}".format(flag,str(rval).strip()) return submitargs
[docs] def submit_jobs(self, case, no_batch=False, job=None, user_prereq=None, skip_pnl=False, allow_fail=False, resubmit_immediate=False, mail_user=None, mail_type=None, batch_args=None, dry_run=False): env_workflow = case.get_env('workflow') external_workflow = case.get_value("EXTERNAL_WORKFLOW") alljobs = env_workflow.get_jobs() alljobs = [j for j in alljobs if os.path.isfile(os.path.join(self._caseroot,get_batch_script_for_job(j)))] startindex = 0 jobs = [] firstjob = job if job is not None: expect(job in alljobs, "Do not know about batch job {}".format(job)) startindex = alljobs.index(job) for index, job in enumerate(alljobs): logger.debug( "Index {:d} job {} startindex {:d}".format(index, job, startindex)) if index < startindex: continue try: prereq = env_workflow.get_value('prereq', subgroup=job, resolved=False) if external_workflow or prereq is None or job == firstjob or (dry_run and prereq == "$BUILD_COMPLETE"): prereq = True else: prereq = case.get_resolved_value(prereq) prereq = eval(prereq) except: expect(False,"Unable to evaluate prereq expression '{}' for job '{}'".format(self.get_value('prereq',subgroup=job), job)) if prereq: jobs.append((job, env_workflow.get_value('dependency', subgroup=job))) if self._batchtype == "cobalt": break depid = OrderedDict() jobcmds = [] if resubmit_immediate: num_submit = case.get_value("RESUBMIT") + 1 case.set_value("RESUBMIT", 0) if num_submit <= 0: num_submit = 1 else: num_submit = 1 prev_job = None batch_job_id = None for _ in range(num_submit): for job, dependency in jobs: if dependency is not None: deps = dependency.split() else: deps = [] dep_jobs = [] if user_prereq is not None: dep_jobs.append(user_prereq) for dep in deps: if dep in depid.keys() and depid[dep] is not None: dep_jobs.append(str(depid[dep])) if prev_job is not None: dep_jobs.append(prev_job) logger.debug("job {} depends on {}".format(job, dep_jobs)) result = self._submit_single_job(case, job, skip_pnl=skip_pnl, resubmit_immediate=resubmit_immediate, dep_jobs=dep_jobs, allow_fail=allow_fail, no_batch=no_batch, mail_user=mail_user, mail_type=mail_type, batch_args=batch_args, dry_run=dry_run) batch_job_id = str(alljobs.index(job)) if dry_run else result depid[job] = batch_job_id jobcmds.append( (job, result) ) if self._batchtype == "cobalt" or external_workflow: break if not external_workflow: expect(batch_job_id, "No result from jobs {}".format(jobs)) prev_job = batch_job_id if dry_run: return jobcmds else: return depid
@staticmethod def _get_supported_args(job, no_batch): """ Returns a map of the supported parameters and their arguments to the given script TODO: Maybe let each script define this somewhere? >>> EnvBatch._get_supported_args("", False) {} >>> EnvBatch._get_supported_args("case.test", False) {'skip_pnl': '--skip-preview-namelist'} >>> EnvBatch._get_supported_args("case.st_archive", True) {'resubmit': '--resubmit'} """ supported = {} if job in ["case.run", "case.test"]: supported["skip_pnl"] = "--skip-preview-namelist" if job == "case.run": supported["set_continue_run"] = "--completion-sets-continue-run" if job in ["case.st_archive", "case.run"]: if job == "case.st_archive" and no_batch: supported["resubmit"] = "--resubmit" else: supported["submit_resubmits"] = "--resubmit" return supported @staticmethod def _build_run_args(job, no_batch, **run_args): """ Returns a map of the filtered parameters for the given script, as well as the values passed and the equivalent arguments for calling the script >>> EnvBatch._build_run_args("case.run", False, skip_pnl=True, cthulu="f'taghn") {'skip_pnl': (True, '--skip-preview-namelist')} >>> EnvBatch._build_run_args("case.run", False, skip_pnl=False, cthulu="f'taghn") {} """ supported_args = EnvBatch._get_supported_args(job, no_batch) args = {} for arg_name, arg_value in run_args.items(): if arg_value and (arg_name in supported_args.keys()): args[arg_name] = (arg_value, supported_args[arg_name]) return args def _build_run_args_str(self, job, no_batch, **run_args): """ Returns a string of the filtered arguments for the given script, based on the arguments passed """ args = self._build_run_args(job, no_batch, **run_args) run_args_str = " ".join(param for _, param in args.values()) logging_options = get_logging_options() if logging_options: run_args_str += " {}".format(logging_options) batch_env_flag = self.get_value("batch_env", subgroup=None) batch_system = self.get_value("BATCH_SYSTEM", subgroup=None) if not batch_env_flag: return run_args_str elif batch_system == "lsf": return "{} \"all, ARGS_FOR_SCRIPT={}\"".format(batch_env_flag, run_args_str) else: return "{} ARGS_FOR_SCRIPT='{}'".format(batch_env_flag, run_args_str) def _submit_single_job(self, case, job, dep_jobs=None, allow_fail=False, no_batch=False, skip_pnl=False, mail_user=None, mail_type=None, batch_args=None, dry_run=False, resubmit_immediate=False): if not dry_run: logger.warning("Submit job {}".format(job)) batch_system = self.get_value("BATCH_SYSTEM", subgroup=None) if batch_system is None or batch_system == "none" or no_batch: logger.info("Starting job script {}".format(job)) function_name = job.replace(".", "_") job_name = "."+job if not dry_run: args = self._build_run_args(job, True, skip_pnl=skip_pnl, set_continue_run=resubmit_immediate, submit_resubmits=not resubmit_immediate) if hasattr(case, function_name): getattr(case, function_name)(**{k: v for k, (v, _) in args.items()}) else: expect(os.path.isfile(job_name),"Could not find file {}".format(job_name)) run_cmd_no_fail(os.path.join(self._caseroot,job_name), combine_output=True, verbose=True, from_dir=self._caseroot) return submitargs = self.get_submit_args(case, job) args_override = self.get_value("BATCH_COMMAND_FLAGS", subgroup=job) if args_override: submitargs = args_override if dep_jobs is not None and len(dep_jobs) > 0: logger.debug("dependencies: {}".format(dep_jobs)) if allow_fail: dep_string = self.get_value("depend_allow_string", subgroup=None) if dep_string is None: logger.warning("'depend_allow_string' is not defined for this batch system, " + "falling back to the 'depend_string'") dep_string = self.get_value("depend_string", subgroup=None) else: dep_string = self.get_value("depend_string", subgroup=None) expect(dep_string is not None, "'depend_string' is not defined for this batch system") separator_string = self.get_value("depend_separator", subgroup=None) expect(separator_string is not None,"depend_separator string not defined") expect("jobid" in dep_string, "depend_string is missing jobid for prerequisite jobs") dep_ids_str = str(dep_jobs[0]) for dep_id in dep_jobs[1:]: dep_ids_str += separator_string + str(dep_id) dep_string = dep_string.replace("jobid",dep_ids_str.strip()) # pylint: disable=maybe-no-member submitargs += " " + dep_string if batch_args is not None: submitargs += " " + batch_args cime_config = get_cime_config() if mail_user is None and cime_config.has_option("main", "MAIL_USER"): mail_user = cime_config.get("main", "MAIL_USER") if mail_user is not None: mail_user_flag = self.get_value('batch_mail_flag', subgroup=None) if mail_user_flag is not None: submitargs += " " + mail_user_flag + " " + mail_user if mail_type is None: if job == "case.test" and cime_config.has_option("create_test", "MAIL_TYPE"): mail_type = cime_config.get("create_test", "MAIL_TYPE") elif cime_config.has_option("main", "MAIL_TYPE"): mail_type = cime_config.get("main", "MAIL_TYPE") else: mail_type = self.get_value("batch_mail_default") if mail_type: mail_type = mail_type.split(",") # pylint: disable=no-member if mail_type: mail_type_flag = self.get_value("batch_mail_type_flag", subgroup=None) if mail_type_flag is not None: mail_type_args = [] for indv_type in mail_type: mail_type_arg = self.get_batch_mail_type(indv_type) mail_type_args.append(mail_type_arg) if mail_type_flag == "-m": # hacky, PBS-type systems pass multiple mail-types differently submitargs += " {} {}".format(mail_type_flag, "".join(mail_type_args)) else: submitargs += " {} {}".format(mail_type_flag, " {} ".format(mail_type_flag).join(mail_type_args)) batchsubmit = self.get_value("batch_submit", subgroup=None) expect(batchsubmit is not None, "Unable to determine the correct command for batch submission.") batchredirect = self.get_value("batch_redirect", subgroup=None) batch_env_flag = self.get_value("batch_env", subgroup=None) run_args = self._build_run_args_str(job, False, skip_pnl=skip_pnl, set_continue_run=resubmit_immediate, submit_resubmits=not resubmit_immediate) if batch_system == 'lsf' and batch_env_flag == 'none': sequence = (run_args, batchsubmit, submitargs, batchredirect, get_batch_script_for_job(job)) elif batch_env_flag: sequence = (batchsubmit, submitargs, run_args, batchredirect, get_batch_script_for_job(job)) else: sequence = (batchsubmit, submitargs, batchredirect, get_batch_script_for_job(job), run_args) submitcmd = " ".join(s.strip() for s in sequence if s is not None) if dry_run: return submitcmd else: logger.info("Submitting job script {}".format(submitcmd)) output = run_cmd_no_fail(submitcmd, combine_output=True) jobid = self.get_job_id(output) logger.info("Submitted job id is {}".format(jobid)) return jobid
[docs] def get_batch_mail_type(self, mail_type): raw = self.get_value("batch_mail_type", subgroup=None) mail_types = [item.strip() for item in raw.split(",")] # pylint: disable=no-member idx = ["never", "all", "begin", "end", "fail"].index(mail_type) return mail_types[idx] if idx < len(mail_types) else None
[docs] def get_batch_system_type(self): nodes = self.get_children("batch_system") for node in nodes: type_ = self.get(node, "type") if type_ is not None: self._batchtype = type_ return self._batchtype
[docs] def set_batch_system_type(self, batchtype): self._batchtype = batchtype
[docs] def get_job_id(self, output): jobid_pattern = self.get_value("jobid_pattern", subgroup=None) expect(jobid_pattern is not None, "Could not find jobid_pattern in env_batch.xml") search_match = re.search(jobid_pattern, output) expect(search_match is not None, "Couldn't match jobid_pattern '{}' within submit output:\n '{}'".format(jobid_pattern, output)) jobid = search_match.group(1) return jobid
[docs] def queue_meets_spec(self, queue, num_nodes, num_tasks, walltime=None, job=None): specs = self.get_queue_specs(queue) if specs is None: logger.warning("WARNING: queue '{}' is unknown to this system".format(queue)) return True nodemin, nodemax, jobname, walltimemax, jobmin, jobmax, strict = specs # A job name match automatically meets spec if job is not None and jobname is not None: return jobname == job if nodemin is not None and num_nodes < nodemin or \ nodemax is not None and num_nodes > nodemax or \ jobmin is not None and num_tasks < jobmin or \ jobmax is not None and num_tasks > jobmax: return False if walltime is not None and walltimemax is not None and strict: walltime_s = convert_to_seconds(walltime) walltimemax_s = convert_to_seconds(walltimemax) if walltime_s > walltimemax_s: return False return True
def _get_all_queue_names(self): all_queues = [] all_queues = self.get_all_queues() # Default queue needs to be first all_queues.insert(0, self.get_default_queue()) queue_names = [] for queue in all_queues: queue_names.append(self.text(queue)) return queue_names
[docs] def select_best_queue(self, num_nodes, num_tasks, walltime=None, job=None): # Make sure to check default queue first. qnames = self._get_all_queue_names() for qname in qnames: if self.queue_meets_spec(qname, num_nodes, num_tasks, walltime=walltime, job=job): return qname return None
[docs] def get_queue_specs(self, queue): """ Get queue specifications by name. Returns (nodemin, nodemax, jobname, walltimemax, jobmin, jobmax, is_strict) """ for queue_node in self.get_all_queues(): if self.text(queue_node) == queue: nodemin = self.get(queue_node, "nodemin") nodemin = None if nodemin is None else int(nodemin) nodemax = self.get(queue_node, "nodemax") nodemax = None if nodemax is None else int(nodemax) jobmin = self.get(queue_node, "jobmin") jobmin = None if jobmin is None else int(jobmin) jobmax = self.get(queue_node, "jobmax") jobmax = None if jobmax is None else int(jobmax) expect( nodemin is None or jobmin is None, "Cannot specify both nodemin and jobmin for a queue") expect( nodemax is None or jobmax is None, "Cannot specify both nodemax and jobmax for a queue") jobname = self.get(queue_node, "jobname") walltimemax = self.get(queue_node, "walltimemax") strict = self.get(queue_node, "strict") == "true" return nodemin, nodemax, jobname, walltimemax, jobmin, jobmax, strict return None
[docs] def get_default_queue(self): bs_nodes = self.get_children("batch_system") node = None for bsnode in bs_nodes: qnodes = self.get_children("queues", root=bsnode) for qnode in qnodes: node = self.get_optional_child("queue", attributes={"default" : "true"}, root=qnode) if node is None: node = self.get_optional_child("queue", root=qnode) expect(node is not None, "No queues found") return node
[docs] def get_all_queues(self): bs_nodes = self.get_children("batch_system") nodes = [] for bsnode in bs_nodes: qnode = self.get_optional_child("queues", root=bsnode) if qnode is not None: nodes.extend(self.get_children("queue", root=qnode)) return nodes
[docs] def get_children(self, name=None, attributes=None, root=None): if name == "PROJECT_REQUIRED": nodes = super(EnvBatch, self).get_children("entry", attributes={"id":name}, root=root) else: nodes = super(EnvBatch, self).get_children(name, attributes=attributes, root=root) return nodes
[docs] def get_status(self, jobid): batch_query = self.get_optional_child("batch_query") if batch_query is None: logger.warning("Batch queries not supported on this platform") else: cmd = self.text(batch_query) + " " if self.has(batch_query, "per_job_arg"): cmd += self.get(batch_query, "per_job_arg") + " " cmd += jobid status, out, err = run_cmd(cmd) if status != 0: logger.warning("Batch query command '{}' failed with error '{}'".format(cmd, err)) else: return out.strip()
[docs] def cancel_job(self, jobid): batch_cancel = self.get_optional_child("batch_cancel") if batch_cancel is None: logger.warning("Batch cancellation not supported on this platform") return False else: cmd = self.text(batch_cancel) + " " + str(jobid) status, out, err = run_cmd(cmd) if status != 0: logger.warning("Batch cancel command '{}' failed with error '{}'".format(cmd, out + "\n" + err)) else: return True
[docs] def compare_xml(self, other): xmldiffs = {} f1batchnodes = self.get_children("batch_system") for bnode in f1batchnodes: f2bnodes = other.get_children("batch_system", attributes = self.attrib(bnode)) f2bnode=None if len(f2bnodes): f2bnode = f2bnodes[0] f1batchnodes = self.get_children(root=bnode) for node in f1batchnodes: name = self.name(node) text1 = self.text(node) text2 = "" attribs = self.attrib(node) f2matches = other.scan_children(name, attributes=attribs, root=f2bnode) foundmatch=False for chkmatch in f2matches: name2 = other.name(chkmatch) attribs2 = other.attrib(chkmatch) text2 = other.text(chkmatch) if(name == name2 and attribs==attribs2 and text1==text2): foundmatch=True break if not foundmatch: xmldiffs[name] = [text1, text2] f1groups = self.get_children("group") for node in f1groups: group = self.get(node, "id") f2group = other.get_child("group", attributes={"id":group}) xmldiffs.update(super(EnvBatch, self).compare_xml(other, root=node, otherroot=f2group)) return xmldiffs
[docs] def make_all_batch_files(self, case): machdir = case.get_value("MACHDIR") env_workflow = case.get_env("workflow") logger.info("Creating batch scripts") jobs = env_workflow.get_jobs() for job in jobs: template = case.get_resolved_value(env_workflow.get_value('template', subgroup=job)) if os.path.isabs(template): input_batch_script = template else: input_batch_script = os.path.join(machdir,template) if os.path.isfile(input_batch_script): logger.info("Writing {} script from input template {}".format(job, input_batch_script)) self.make_batch_script(input_batch_script, job, case) else: logger.warning("Input template file {} for job {} does not exist or cannot be read.".format(input_batch_script, job))