Source code for CIME.XML.env_batch

"""
Interface to the env_batch.xml file.  This class inherits from EnvBase
"""

from CIME.XML.standard_module_setup import *
from CIME.XML.env_base import EnvBase
from CIME.utils import transform_vars, get_cime_root, convert_to_seconds, convert_to_babylonian_time, \
    get_cime_config, get_batch_script_for_job, get_logging_options, format_time
from CIME.locked_files import lock_file, unlock_file
from collections import OrderedDict
import stat, re, math

logger = logging.getLogger(__name__)

# pragma pylint: disable=attribute-defined-outside-init

[docs]class EnvBatch(EnvBase): def __init__(self, case_root=None, infile="env_batch.xml", read_only=False): """ initialize an object interface to file env_batch.xml in the case directory """ self._batchtype = None # This arbitrary setting should always be overwritten self._default_walltime = "00:20:00" schema = os.path.join(get_cime_root(), "config", "xml_schemas", "env_batch.xsd") super(EnvBatch,self).__init__(case_root, infile, schema=schema, read_only=read_only) # pylint: disable=arguments-differ
[docs] def set_value(self, item, value, subgroup=None, ignore_type=False): """ Override the entry_id set_value function with some special cases for this class """ val = None if item == "JOB_QUEUE": expect(value in self._get_all_queue_names() or ignore_type, "Unknown Job Queue specified use --force to set") # allow the user to set item for all jobs if subgroup is not provided if subgroup is None: gnodes = self.get_children("group") for gnode in gnodes: node = self.get_optional_child("entry", {"id":item}, root=gnode) if node is not None: self._set_value(node, value, vid=item, ignore_type=ignore_type) val = value else: group = self.get_optional_child("group", {"id":subgroup}) if group is not None: node = self.get_optional_child("entry", {"id":item}, root=group) if node is not None: val = self._set_value(node, value, vid=item, ignore_type=ignore_type) return val
# pylint: disable=arguments-differ
[docs] def get_value(self, item, attribute=None, resolved=True, subgroup=None): """ Must default subgroup to something in order to provide single return value """ value = None node = self.get_optional_child(item, attribute) if item in ("BATCH_SYSTEM", "PROJECT_REQUIRED"): return super(EnvBatch, self).get_value(item,attribute,resolved) if not node: # this will take the last instance of item listed in all batch_system elements bs_nodes = self.get_children("batch_system") for bsnode in bs_nodes: cnode = self.get_optional_child(item, attribute, root=bsnode) if cnode: node = cnode if node: value = self.text(node) if resolved: value = self.get_resolved_value(value) return value
[docs] def get_type_info(self, vid): gnodes = self.get_children("group") for gnode in gnodes: nodes = self.get_children("entry",{"id":vid}, root=gnode) type_info = None for node in nodes: new_type_info = self._get_type_info(node) if type_info is None: type_info = new_type_info else: expect( type_info == new_type_info, "Inconsistent type_info for entry id={} {} {}".format(vid, new_type_info, type_info)) return type_info
[docs] def get_jobs(self): groups = self.get_children("group") results = [] for group in groups: if self.get(group, "id") not in ["job_submission", "config_batch"]: results.append(self.get(group, "id")) return results
[docs] def create_job_groups(self, batch_jobs, is_test): # Subtle: in order to support dynamic batch jobs, we need to remove the # job_submission group and replace with job-based groups orig_group = self.get_child("group", {"id":"job_submission"}, err_msg="Looks like job groups have already been created") orig_group_children = super(EnvBatch, self).get_children(root=orig_group) childnodes = [] for child in reversed(orig_group_children): childnodes.append(child) self.remove_child(orig_group) for name, jdict in batch_jobs: if name == "case.run" and is_test: pass # skip elif name == "case.test" and not is_test: pass # skip elif name == "case.run.sh": pass # skip else: new_job_group = self.make_child("group", {"id":name}) for field in jdict.keys(): val = jdict[field] node = self.make_child("entry", {"id":field,"value":val}, root=new_job_group) self.make_child("type", root=node, text="char") for child in childnodes: self.add_child(self.copy(child), root=new_job_group)
[docs] def cleanupnode(self, node): if self.get(node, "id") == "batch_system": fnode = self.get_child(name="file", root=node) self.remove_child(fnode, root=node) gnode = self.get_child(name="group", root=node) self.remove_child(gnode, root=node) vnode = self.get_optional_child(name="values", root=node) if vnode is not None: self.remove_child(vnode, root=node) else: node = super(EnvBatch, self).cleanupnode(node) return node
[docs] def set_batch_system(self, batchobj, batch_system_type=None): if batch_system_type is not None: self.set_batch_system_type(batch_system_type) if batchobj.batch_system_node is not None and batchobj.machine_node is not None: for node in batchobj.get_children("",root=batchobj.machine_node): name = self.name(node) if name != 'directives': oldnode = batchobj.get_optional_child(name, root=batchobj.batch_system_node) if oldnode is not None: logger.debug( "Replacing {}".format(self.name(oldnode))) batchobj.remove_child(oldnode, root=batchobj.batch_system_node) if batchobj.batch_system_node is not None: self.add_child(self.copy(batchobj.batch_system_node)) if batchobj.machine_node is not None: self.add_child(self.copy(batchobj.machine_node)) if os.path.exists(os.path.join(self._caseroot, "LockedFiles", "env_batch.xml")): unlock_file(os.path.basename(batchobj.filename), caseroot=self._caseroot) self.set_value("BATCH_SYSTEM", batch_system_type) if os.path.exists(os.path.join(self._caseroot, "LockedFiles")): lock_file(os.path.basename(batchobj.filename), caseroot=self._caseroot)
[docs] def get_job_overrides(self, job, case): env_workflow = case.get_env('workflow') total_tasks, num_nodes, tasks_per_node, thread_count = env_workflow.get_job_specs(case, job) overrides = {} if total_tasks: overrides["total_tasks"] = total_tasks overrides["num_nodes"] = num_nodes overrides["tasks_per_node"] = tasks_per_node if thread_count: overrides["thread_count"] = thread_count else: total_tasks = case.get_value("TOTALPES")*int(case.thread_count) thread_count = case.thread_count if int(total_tasks)*int(thread_count) < case.get_value("MAX_TASKS_PER_NODE"): overrides["max_tasks_per_node"] = int(total_tasks) overrides["mpirun"] = case.get_mpirun_cmd(job=job, overrides=overrides) return overrides
[docs] def make_batch_script(self, input_template, job, case, outfile=None): expect(os.path.exists(input_template), "input file '{}' does not exist".format(input_template)) overrides = self.get_job_overrides(job, case) ext = os.path.splitext(job)[-1] if len(ext) == 0: ext = job if ext.startswith('.'): ext = ext[1:] overrides["job_id"] = ext + '.' + case.get_value("CASE") if "pleiades" in case.get_value("MACH"): # pleiades jobname needs to be limited to 15 chars overrides["job_id"] = overrides["job_id"][:15] overrides["batchdirectives"] = self.get_batch_directives(case, job, overrides=overrides) output_text = transform_vars(open(input_template,"r").read(), case=case, subgroup=job, overrides=overrides) output_name = get_batch_script_for_job(job) if outfile is None else outfile logger.info("Creating file {}".format(output_name)) with open(output_name, "w") as fd: fd.write(output_text) # make sure batch script is exectuble os.chmod(output_name, os.stat(output_name).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
[docs] def set_job_defaults(self, batch_jobs, case): if self._batchtype is None: self._batchtype = self.get_batch_system_type() if self._batchtype == "none": return env_workflow = case.get_env('workflow') known_jobs = env_workflow.get_jobs() for job, jsect in batch_jobs: if job not in known_jobs: continue walltime = case.get_value("USER_REQUESTED_WALLTIME", subgroup=job) if case.get_value("USER_REQUESTED_WALLTIME", subgroup=job) else None force_queue = case.get_value("USER_REQUESTED_QUEUE", subgroup=job) if case.get_value("USER_REQUESTED_QUEUE", subgroup=job) else None walltime_format = case.get_value("walltime_format", subgroup=job) if case.get_value("walltime_format", subgroup=job) else None logger.info("job is {} USER_REQUESTED_WALLTIME {} USER_REQUESTED_QUEUE {} WALLTIME_FORMAT {}".format(job, walltime, force_queue, walltime_format)) task_count = int(jsect["task_count"]) if "task_count" in jsect else case.total_tasks walltime = jsect["walltime"] if ("walltime" in jsect and walltime is None) else walltime if "task_count" in jsect: # job is using custom task_count, need to compute a node_count based on this node_count = int(math.ceil(float(task_count)/float(case.tasks_per_node))) else: node_count = case.num_nodes queue = self.select_best_queue(node_count, task_count, name=force_queue, walltime=walltime, job=job) if queue is None and walltime is not None: # Try to see if walltime was the holdup queue = self.select_best_queue(node_count, task_count, name=force_queue, walltime=None, job=job) if queue is not None: # It was, override the walltime if a test, otherwise just warn the user new_walltime = self.get_queue_specs(queue)[3] expect(new_walltime is not None, "Should never make it here") logger.warning("WARNING: Requested walltime '{}' could not be matched by any {} queue".format(walltime, force_queue)) if case.get_value("TEST"): logger.warning(" Using walltime '{}' instead".format(new_walltime)) walltime = new_walltime else: logger.warning(" Continuing with suspect walltime, batch submission may fail") if queue is None: logger.warning("WARNING: No queue on this system met the requirements for this job. Falling back to defaults") queue = self.get_default_queue() walltime = self.get_queue_specs(queue)[3] specs = self.get_queue_specs(queue) if walltime is None: # Figure out walltime if specs is None: # Queue is unknown, use specs from default queue walltime = self.get(self.get_default_queue(), "walltimemax") else: walltime = specs[3] walltime = self._default_walltime if walltime is None else walltime # last-chance fallback walltime_format = self.get_value("walltime_format") if walltime_format: seconds = convert_to_seconds(walltime) full_bab_time = convert_to_babylonian_time(seconds) walltime = format_time(walltime_format, "%H:%M:%S", full_bab_time) env_workflow.set_value("JOB_QUEUE", self.text(queue), subgroup=job, ignore_type=specs is None) env_workflow.set_value("JOB_WALLCLOCK_TIME", walltime, subgroup=job) logger.debug("Job {} queue {} walltime {}".format(job, self.text(queue), walltime))
def _match_attribs(self, attribs, case, queue): # check for matches with case-vars for attrib in attribs: if attrib in ["default", "prefix"]: # These are not used for matching continue elif attrib == "queue": if not self._match(queue, attribs["queue"]): return False else: val = case.get_value(attrib.upper()) expect(val is not None, "Cannot match attrib '%s', case has no value for it" % attrib.upper()) if not self._match(val, attribs[attrib]): return False return True def _match(self, my_value, xml_value): if xml_value.startswith("!"): result = re.match(xml_value[1:],str(my_value)) is None elif isinstance(my_value, bool): if my_value: result = xml_value == "TRUE" else: result = xml_value == "FALSE" else: result = re.match(xml_value,str(my_value)) is not None logger.debug("(env_mach_specific) _match {} {} {}".format(my_value, xml_value, result)) return result
[docs] def get_batch_directives(self, case, job, overrides=None, output_format='default'): """ """ result = [] directive_prefix = None roots = self.get_children("batch_system") queue = self.get_value("JOB_QUEUE", subgroup=job) if self._batchtype != "none" and not queue in self._get_all_queue_names(): unknown_queue = True qnode = self.get_default_queue() default_queue = self.text(qnode) else: unknown_queue = False for root in roots: if root is not None: if directive_prefix is None: if output_format == 'default': directive_prefix = self.get_element_text("batch_directive", root=root) elif output_format == 'cylc': directive_prefix = " " if unknown_queue: unknown_queue_directives = self.get_element_text("unknown_queue_directives", root=root) if unknown_queue_directives is None: queue = default_queue else: queue = unknown_queue_directives dnodes = self.get_children("directives", root=root) for dnode in dnodes: nodes = self.get_children("directive", root=dnode) if self._match_attribs(self.attrib(dnode), case, queue): for node in nodes: directive = self.get_resolved_value("" if self.text(node) is None else self.text(node)) if output_format == 'cylc': if self._batchtype == 'pbs': # cylc includes the -N itself, no need to add if directive.startswith("-N"): directive='' continue m = re.match(r'\s*(-[\w])', directive) if m: directive = re.sub(r'(-[\w]) ','{} = '.format(m.group(1)), directive) default = self.get(node, "default") if default is None: directive = transform_vars(directive, case=case, subgroup=job, default=default, overrides=overrides) else: directive = transform_vars(directive, default=default) custom_prefix = self.get(node, "prefix") prefix = directive_prefix if custom_prefix is None else custom_prefix result.append("{}{}".format("" if not prefix else (prefix + " "), directive)) return "\n".join(result)
[docs] def get_submit_args(self, case, job): ''' return a list of touples (flag, name) ''' submitargs = " " bs_nodes = self.get_children("batch_system") submit_arg_nodes = [] for node in bs_nodes: sanode = self.get_optional_child("submit_args", root=node) if sanode is not None: submit_arg_nodes += self.get_children("arg",root=sanode) for arg in submit_arg_nodes: flag = self.get(arg, "flag") name = self.get(arg, "name") if self._batchtype == "cobalt" and job == "case.st_archive": if flag == "-n": name = 'task_count' if flag == "--mode": continue if name is None: submitargs+=" {}".format(flag) else: if name.startswith("$"): name = name[1:] if '$' in name: # We have a complex expression and must rely on get_resolved_value. # Hopefully, none of the values require subgroup val = case.get_resolved_value(name) else: val = case.get_value(name, subgroup=job) if val is not None and len(str(val)) > 0 and val != "None": # Try to evaluate val if it contains any whitespace if " " in val: try: rval = eval(val) except Exception: rval = val else: rval = val # We don't want floating-point data try: rval = int(round(float(rval))) except ValueError: pass # need a correction for tasks per node if flag == "-n" and rval <= 0: rval = 1 if flag == "-q" and rval == "batch" and case.get_value("MACH") == "blues": # Special case. Do not provide '-q batch' for blues continue if flag.rfind("=", len(flag)-1, len(flag)) >= 0 or\ flag.rfind(":", len(flag)-1, len(flag)) >= 0: submitargs+=" {}{}".format(flag,str(rval).strip()) else: submitargs+=" {} {}".format(flag,str(rval).strip()) return submitargs
[docs] def submit_jobs(self, case, no_batch=False, job=None, user_prereq=None, skip_pnl=False, allow_fail=False, resubmit_immediate=False, mail_user=None, mail_type=None, batch_args=None, dry_run=False, workflow=True): """ no_batch indicates that the jobs should be run directly rather that submitted to a queueing system job is the first job in the workflow sequence to start user_prereq is a batch system prerequisite as requested by the user skip_pnl indicates that the preview_namelist should not be run by this job allow_fail indicates that the prereq job need only complete not nessasarily successfully to start the next job resubmit_immediate indicates that all jobs indicated by the RESUBMIT option should be submitted at the same time instead of waiting to resubmit at the end of the first sequence workflow is a logical indicating whether only "job" is submitted or the workflow sequence starting with "job" is submitted """ env_workflow = case.get_env('workflow') external_workflow = case.get_value("EXTERNAL_WORKFLOW") alljobs = env_workflow.get_jobs() alljobs = [j for j in alljobs if os.path.isfile(os.path.join(self._caseroot,get_batch_script_for_job(j)))] startindex = 0 jobs = [] firstjob = job if job is not None: expect(job in alljobs, "Do not know about batch job {}".format(job)) startindex = alljobs.index(job) for index, job in enumerate(alljobs): logger.debug( "Index {:d} job {} startindex {:d}".format(index, job, startindex)) if index < startindex: continue try: prereq = env_workflow.get_value('prereq', subgroup=job, resolved=False) if external_workflow or prereq is None or job == firstjob or (dry_run and prereq == "$BUILD_COMPLETE"): prereq = True else: prereq = case.get_resolved_value(prereq) prereq = eval(prereq) except Exception: expect(False,"Unable to evaluate prereq expression '{}' for job '{}'".format(self.get_value('prereq',subgroup=job), job)) if prereq: jobs.append((job, env_workflow.get_value('dependency', subgroup=job))) if self._batchtype == "cobalt": break depid = OrderedDict() jobcmds = [] if workflow and resubmit_immediate: num_submit = case.get_value("RESUBMIT") + 1 case.set_value("RESUBMIT", 0) if num_submit <= 0: num_submit = 1 else: num_submit = 1 prev_job = None batch_job_id = None for _ in range(num_submit): for job, dependency in jobs: if dependency is not None: deps = dependency.split() else: deps = [] dep_jobs = [] if user_prereq is not None: dep_jobs.append(user_prereq) for dep in deps: if dep in depid.keys() and depid[dep] is not None: dep_jobs.append(str(depid[dep])) if prev_job is not None: dep_jobs.append(prev_job) logger.debug("job {} depends on {}".format(job, dep_jobs)) result = self._submit_single_job(case, job, skip_pnl=skip_pnl, resubmit_immediate=resubmit_immediate, dep_jobs=dep_jobs, allow_fail=allow_fail, no_batch=no_batch, mail_user=mail_user, mail_type=mail_type, batch_args=batch_args, dry_run=dry_run, workflow=workflow) batch_job_id = str(alljobs.index(job)) if dry_run else result depid[job] = batch_job_id jobcmds.append( (job, result) ) if self._batchtype == "cobalt" or external_workflow or not workflow: break if not external_workflow and not no_batch: expect(batch_job_id, "No result from jobs {}".format(jobs)) prev_job = batch_job_id if dry_run: return jobcmds else: return depid
@staticmethod def _get_supported_args(job, no_batch): """ Returns a map of the supported parameters and their arguments to the given script TODO: Maybe let each script define this somewhere? >>> EnvBatch._get_supported_args("", False) {} >>> EnvBatch._get_supported_args("case.test", False) {'skip_pnl': '--skip-preview-namelist'} >>> EnvBatch._get_supported_args("case.st_archive", True) {'resubmit': '--resubmit'} """ supported = {} if job in ["case.run", "case.test"]: supported["skip_pnl"] = "--skip-preview-namelist" if job == "case.run": supported["set_continue_run"] = "--completion-sets-continue-run" if job in ["case.st_archive", "case.run"]: if job == "case.st_archive" and no_batch: supported["resubmit"] = "--resubmit" else: supported["submit_resubmits"] = "--resubmit" return supported @staticmethod def _build_run_args(job, no_batch, **run_args): """ Returns a map of the filtered parameters for the given script, as well as the values passed and the equivalent arguments for calling the script >>> EnvBatch._build_run_args("case.run", False, skip_pnl=True, cthulu="f'taghn") {'skip_pnl': (True, '--skip-preview-namelist')} >>> EnvBatch._build_run_args("case.run", False, skip_pnl=False, cthulu="f'taghn") {} """ supported_args = EnvBatch._get_supported_args(job, no_batch) args = {} for arg_name, arg_value in run_args.items(): if arg_value and (arg_name in supported_args.keys()): args[arg_name] = (arg_value, supported_args[arg_name]) return args def _build_run_args_str(self, job, no_batch, **run_args): """ Returns a string of the filtered arguments for the given script, based on the arguments passed """ args = self._build_run_args(job, no_batch, **run_args) run_args_str = " ".join(param for _, param in args.values()) logging_options = get_logging_options() if logging_options: run_args_str += " {}".format(logging_options) batch_env_flag = self.get_value("batch_env", subgroup=None) if not batch_env_flag: return run_args_str elif len(run_args_str) > 0: batch_system = self.get_value("BATCH_SYSTEM", subgroup=None) logger.debug("batch_system: {}: ".format(batch_system)) if batch_system == "lsf": return "{} \"all, ARGS_FOR_SCRIPT={}\"".format(batch_env_flag, run_args_str) else: return "{} ARGS_FOR_SCRIPT='{}'".format(batch_env_flag, run_args_str) else: return "" def _submit_single_job(self, case, job, dep_jobs=None, allow_fail=False, no_batch=False, skip_pnl=False, mail_user=None, mail_type=None, batch_args=None, dry_run=False, resubmit_immediate=False, workflow=True): if not dry_run: logger.warning("Submit job {}".format(job)) batch_system = self.get_value("BATCH_SYSTEM", subgroup=None) if batch_system is None or batch_system == "none" or no_batch: logger.info("Starting job script {}".format(job)) function_name = job.replace(".", "_") job_name = "."+job if not dry_run: args = self._build_run_args(job, True, skip_pnl=skip_pnl, set_continue_run=resubmit_immediate, submit_resubmits=workflow and not resubmit_immediate) try: if hasattr(case, function_name): getattr(case, function_name)(**{k: v for k, (v, _) in args.items()}) else: expect(os.path.isfile(job_name),"Could not find file {}".format(job_name)) run_cmd_no_fail(os.path.join(self._caseroot,job_name), combine_output=True, verbose=True, from_dir=self._caseroot) except Exception as e: # We don't want exception from the run phases getting into submit phase logger.warning("Exception from {}: {}".format(function_name, str(e))) return submitargs = self.get_submit_args(case, job) args_override = self.get_value("BATCH_COMMAND_FLAGS", subgroup=job) if args_override: submitargs = args_override if dep_jobs is not None and len(dep_jobs) > 0: logger.debug("dependencies: {}".format(dep_jobs)) if allow_fail: dep_string = self.get_value("depend_allow_string", subgroup=None) if dep_string is None: logger.warning("'depend_allow_string' is not defined for this batch system, " + "falling back to the 'depend_string'") dep_string = self.get_value("depend_string", subgroup=None) else: dep_string = self.get_value("depend_string", subgroup=None) expect(dep_string is not None, "'depend_string' is not defined for this batch system") separator_string = self.get_value("depend_separator", subgroup=None) expect(separator_string is not None,"depend_separator string not defined") expect("jobid" in dep_string, "depend_string is missing jobid for prerequisite jobs") dep_ids_str = str(dep_jobs[0]) for dep_id in dep_jobs[1:]: dep_ids_str += separator_string + str(dep_id) dep_string = dep_string.replace("jobid",dep_ids_str.strip()) # pylint: disable=maybe-no-member submitargs += " " + dep_string if batch_args is not None: submitargs += " " + batch_args cime_config = get_cime_config() if mail_user is None and cime_config.has_option("main", "MAIL_USER"): mail_user = cime_config.get("main", "MAIL_USER") if mail_user is not None: mail_user_flag = self.get_value('batch_mail_flag', subgroup=None) if mail_user_flag is not None: submitargs += " " + mail_user_flag + " " + mail_user if mail_type is None: if job == "case.test" and cime_config.has_option("create_test", "MAIL_TYPE"): mail_type = cime_config.get("create_test", "MAIL_TYPE") elif cime_config.has_option("main", "MAIL_TYPE"): mail_type = cime_config.get("main", "MAIL_TYPE") else: mail_type = self.get_value("batch_mail_default") if mail_type: mail_type = mail_type.split(",") # pylint: disable=no-member if mail_type: mail_type_flag = self.get_value("batch_mail_type_flag", subgroup=None) if mail_type_flag is not None: mail_type_args = [] for indv_type in mail_type: mail_type_arg = self.get_batch_mail_type(indv_type) mail_type_args.append(mail_type_arg) if mail_type_flag == "-m": # hacky, PBS-type systems pass multiple mail-types differently submitargs += " {} {}".format(mail_type_flag, "".join(mail_type_args)) else: submitargs += " {} {}".format(mail_type_flag, " {} ".format(mail_type_flag).join(mail_type_args)) batchsubmit = self.get_value("batch_submit", subgroup=None) expect(batchsubmit is not None, "Unable to determine the correct command for batch submission.") batchredirect = self.get_value("batch_redirect", subgroup=None) batch_env_flag = self.get_value("batch_env", subgroup=None) run_args = self._build_run_args_str(job, False, skip_pnl=skip_pnl, set_continue_run=resubmit_immediate, submit_resubmits=workflow and not resubmit_immediate) if batch_system == 'lsf' and not batch_env_flag: sequence = (run_args, batchsubmit, submitargs, batchredirect, get_batch_script_for_job(job)) elif batch_env_flag: sequence = (batchsubmit, submitargs, run_args, batchredirect, get_batch_script_for_job(job)) else: sequence = (batchsubmit, submitargs, batchredirect, get_batch_script_for_job(job), run_args) submitcmd = " ".join(s.strip() for s in sequence if s is not None) if submitcmd.startswith("ssh"): # add ` before cd $CASEROOT and at end of command submitcmd = submitcmd.replace("cd $CASEROOT","\'cd $CASEROOT") + "\'" if dry_run: return submitcmd else: submitcmd = case.get_resolved_value(submitcmd) logger.info("Submitting job script {}".format(submitcmd)) output = run_cmd_no_fail(submitcmd, combine_output=True) jobid = self.get_job_id(output) logger.info("Submitted job id is {}".format(jobid)) return jobid
[docs] def get_batch_mail_type(self, mail_type): raw = self.get_value("batch_mail_type", subgroup=None) mail_types = [item.strip() for item in raw.split(",")] # pylint: disable=no-member idx = ["never", "all", "begin", "end", "fail"].index(mail_type) return mail_types[idx] if idx < len(mail_types) else None
[docs] def get_batch_system_type(self): nodes = self.get_children("batch_system") for node in nodes: type_ = self.get(node, "type") if type_ is not None: self._batchtype = type_ return self._batchtype
[docs] def set_batch_system_type(self, batchtype): self._batchtype = batchtype
[docs] def get_job_id(self, output): jobid_pattern = self.get_value("jobid_pattern", subgroup=None) expect(jobid_pattern is not None, "Could not find jobid_pattern in env_batch.xml") search_match = re.search(jobid_pattern, output) expect(search_match is not None, "Couldn't match jobid_pattern '{}' within submit output:\n '{}'".format(jobid_pattern, output)) jobid = search_match.group(1) return jobid
[docs] def queue_meets_spec(self, queue, num_nodes, num_tasks, walltime=None, job=None): specs = self.get_queue_specs(queue) nodemin, nodemax, jobname, walltimemax, jobmin, jobmax, strict = specs # A job name match automatically meets spec if job is not None and jobname is not None: return jobname == job if nodemin is not None and num_nodes < nodemin or \ nodemax is not None and num_nodes > nodemax or \ jobmin is not None and num_tasks < jobmin or \ jobmax is not None and num_tasks > jobmax: return False if walltime is not None and walltimemax is not None and strict: walltime_s = convert_to_seconds(walltime) walltimemax_s = convert_to_seconds(walltimemax) if walltime_s > walltimemax_s: return False return True
def _get_all_queue_names(self): all_queues = [] all_queues = self.get_all_queues() queue_names = [] for queue in all_queues: queue_names.append(self.text(queue)) return queue_names
[docs] def select_best_queue(self, num_nodes, num_tasks, name=None, walltime=None, job=None): # Make sure to check default queue first. qnodes = self.get_all_queues(name=name) for qnode in qnodes: if self.queue_meets_spec(qnode, num_nodes, num_tasks, walltime=walltime, job=job): return qnode return None
[docs] def get_queue_specs(self, qnode): """ Get queue specifications from node. Returns (nodemin, nodemax, jobname, walltimemax, jobmin, jobmax, is_strict) """ nodemin = self.get(qnode, "nodemin") nodemin = None if nodemin is None else int(nodemin) nodemax = self.get(qnode, "nodemax") nodemax = None if nodemax is None else int(nodemax) jobmin = self.get(qnode, "jobmin") jobmin = None if jobmin is None else int(jobmin) jobmax = self.get(qnode, "jobmax") jobmax = None if jobmax is None else int(jobmax) expect( nodemin is None or jobmin is None, "Cannot specify both nodemin and jobmin for a queue") expect( nodemax is None or jobmax is None, "Cannot specify both nodemax and jobmax for a queue") jobname = self.get(qnode, "jobname") walltimemax = self.get(qnode, "walltimemax") strict = self.get(qnode, "strict") == "true" return nodemin, nodemax, jobname, walltimemax, jobmin, jobmax, strict
[docs] def get_default_queue(self): bs_nodes = self.get_children("batch_system") node = None for bsnode in bs_nodes: qnodes = self.get_children("queues", root=bsnode) for qnode in qnodes: node = self.get_optional_child("queue", attributes={"default" : "true"}, root=qnode) if node is None: node = self.get_optional_child("queue", root=qnode) expect(node is not None, "No queues found") return node
[docs] def get_all_queues(self, name=None): bs_nodes = self.get_children("batch_system") nodes = [] default_idx = None for bsnode in bs_nodes: qsnode = self.get_optional_child("queues", root=bsnode) if qsnode is not None: qnodes = self.get_children("queue", root=qsnode) for qnode in qnodes: if name is None or self.text(qnode) == name: nodes.append(qnode) if self.get(qnode, "default", default="false") == "true": default_idx = len(nodes) - 1 # Queues are selected by first match, so we want the queue marked # as default to come first. if default_idx is not None: def_node = nodes.pop(default_idx) nodes.insert(0, def_node) return nodes
[docs] def get_children(self, name=None, attributes=None, root=None): if name == "PROJECT_REQUIRED": nodes = super(EnvBatch, self).get_children("entry", attributes={"id":name}, root=root) else: nodes = super(EnvBatch, self).get_children(name, attributes=attributes, root=root) return nodes
[docs] def get_status(self, jobid): batch_query = self.get_optional_child("batch_query") if batch_query is None: logger.warning("Batch queries not supported on this platform") else: cmd = self.text(batch_query) + " " if self.has(batch_query, "per_job_arg"): cmd += self.get(batch_query, "per_job_arg") + " " cmd += jobid status, out, err = run_cmd(cmd) if status != 0: logger.warning("Batch query command '{}' failed with error '{}'".format(cmd, err)) else: return out.strip()
[docs] def cancel_job(self, jobid): batch_cancel = self.get_optional_child("batch_cancel") if batch_cancel is None: logger.warning("Batch cancellation not supported on this platform") return False else: cmd = self.text(batch_cancel) + " " + str(jobid) status, out, err = run_cmd(cmd) if status != 0: logger.warning("Batch cancel command '{}' failed with error '{}'".format(cmd, out + "\n" + err)) else: return True
[docs] def compare_xml(self, other): xmldiffs = {} f1batchnodes = self.get_children("batch_system") for bnode in f1batchnodes: f2bnodes = other.get_children("batch_system", attributes = self.attrib(bnode)) f2bnode=None if len(f2bnodes): f2bnode = f2bnodes[0] f1batchnodes = self.get_children(root=bnode) for node in f1batchnodes: name = self.name(node) text1 = self.text(node) text2 = "" attribs = self.attrib(node) f2matches = other.scan_children(name, attributes=attribs, root=f2bnode) foundmatch=False for chkmatch in f2matches: name2 = other.name(chkmatch) attribs2 = other.attrib(chkmatch) text2 = other.text(chkmatch) if(name == name2 and attribs==attribs2 and text1==text2): foundmatch=True break if not foundmatch: xmldiffs[name] = [text1, text2] f1groups = self.get_children("group") for node in f1groups: group = self.get(node, "id") f2group = other.get_child("group", attributes={"id":group}) xmldiffs.update(super(EnvBatch, self).compare_xml(other, root=node, otherroot=f2group)) return xmldiffs
[docs] def make_all_batch_files(self, case): machdir = case.get_value("MACHDIR") env_workflow = case.get_env("workflow") logger.info("Creating batch scripts") jobs = env_workflow.get_jobs() for job in jobs: template = case.get_resolved_value(env_workflow.get_value('template', subgroup=job)) if os.path.isabs(template): input_batch_script = template else: input_batch_script = os.path.join(machdir,template) if os.path.isfile(input_batch_script): logger.info("Writing {} script from input template {}".format(job, input_batch_script)) self.make_batch_script(input_batch_script, job, case) else: logger.warning("Input template file {} for job {} does not exist or cannot be read.".format(input_batch_script, job))