Source code for Tools.generate_cylc_workflow

#!/usr/bin/env python3

"""
Generates a cylc workflow file for the case.  See https://cylc.github.io for details about cylc
"""
import os
import sys

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))

from CIME.Tools.standard_script_setup import *

from CIME.case import Case
from CIME.utils import expect, transform_vars

import argparse, re

logger = logging.getLogger(__name__)

###############################################################################
[docs] def parse_command_line(args, description): ############################################################################### parser = argparse.ArgumentParser( description=description, formatter_class=argparse.RawTextHelpFormatter ) CIME.utils.setup_standard_logging_options(parser) parser.add_argument( "caseroot", nargs="?", default=os.getcwd(), help="Case directory for which namelists are generated.\n" "Default is current directory.", ) parser.add_argument( "--cycles", default=1, help="The number of cycles to run, default is RESUBMIT" ) parser.add_argument( "--ensemble", default=1, help="generate suite.rc for an ensemble of cases, the case name argument must end in an integer.\n" "for example: ./generate_cylc_workflow.py --ensemble 4 \n" "will generate a workflow file in the current case, if that case is named case.01," "the workflow will include case.01, case.02, case.03 and case.04", ) args = CIME.utils.parse_args_and_handle_standard_logging_options(args, parser) return args.caseroot, args.cycles, int(args.ensemble)
[docs] def cylc_get_ensemble_first_and_last(case, ensemble): if ensemble == 1: return 1, None casename = case.get_value("CASE") m = re.search(r"(.*[^\d])(\d+)$", casename) minval = int(m.group(2)) maxval = minval + ensemble - 1 return minval, maxval
[docs] def cylc_get_case_path_string(case, ensemble): caseroot = case.get_value("CASEROOT") casename = case.get_value("CASE") if ensemble == 1: return "{};".format(caseroot) basepath = os.path.abspath(caseroot + "/..") m = re.search(r"(.*[^\d])(\d+)$", casename) expect(m, "casename {} must end in an integer for ensemble method".format(casename)) return ( '{basepath}/{basename}$(printf "%0{intlen}d"'.format( basepath=basepath, basename=m.group(1), intlen=len(m.group(2)) ) + " ${CYLC_TASK_PARAM_member});" )
[docs] def cylc_batch_job_template(job, jobname, case, ensemble): env_batch = case.get_env("batch") batch_system_type = env_batch.get_batch_system_type() batchsubmit = env_batch.get_value("batch_submit") submit_args = env_batch.get_submit_args(case, job) case_path_string = cylc_get_case_path_string(case, ensemble) return ( """ [[{jobname}<member>]] script = cd {case_path_string} ./case.submit --job {job} [[[job]]] batch system = {batch_system_type} batch submit command template = {batchsubmit} {submit_args} '%(job)s' [[[directives]]] """.format( jobname=jobname, job=job, case_path_string=case_path_string, batch_system_type=batch_system_type, batchsubmit=batchsubmit, submit_args=submit_args, ) + "{{ batchdirectives }}\n" )
[docs] def cylc_script_job_template(job, case, ensemble): case_path_string = cylc_get_case_path_string(case, ensemble) return """ [[{job}<member>]] script = cd {case_path_string} ./case.submit --job {job} """.format( job=job, case_path_string=case_path_string )
############################################################################### def _main_func(description): ############################################################################### caseroot, cycles, ensemble = parse_command_line(sys.argv, description) expect( os.path.isfile(os.path.join(caseroot, "CaseStatus")), "case.setup must be run prior to running {}".format(__file__), ) with Case(caseroot, read_only=True) as case: if cycles == 1: cycles = max(1, case.get_value("RESUBMIT")) env_batch = case.get_env("batch") env_workflow = case.get_env("workflow") jobs = env_workflow.get_jobs() casename = case.get_value("CASE") input_template = os.path.join( case.get_value("MACHDIR"), "cylc_suite.rc.template" ) overrides = {"cycles": cycles, "casename": casename} input_text = open(input_template).read() first, last = cylc_get_ensemble_first_and_last(case, ensemble) if ensemble == 1: overrides.update({"members": "{}".format(first)}) overrides.update( {"workflow_description": "case {}".format(case.get_value("CASE"))} ) else: overrides.update({"members": "{}..{}".format(first, last)}) firstcase = case.get_value("CASE") intlen = len(str(last)) lastcase = firstcase[:-intlen] + str(last) overrides.update( { "workflow_description": "ensemble from {} to {}".format( firstcase, lastcase ) } ) overrides.update( {"case_path_string": cylc_get_case_path_string(case, ensemble)} ) for job in jobs: jobname = job if job == "case.st_archive": continue if job == "case.run": jobname = "run" overrides.update(env_batch.get_job_overrides(job, case)) overrides.update({"job_id": "run." + casename}) input_text = input_text + cylc_batch_job_template( job, jobname, case, ensemble ) else: depends_on = env_workflow.get_value("dependency", subgroup=job) if depends_on.startswith("case."): depends_on = depends_on[5:] input_text = input_text.replace( " => " + depends_on, " => " + depends_on + "<member> => " + job ) overrides.update(env_batch.get_job_overrides(job, case)) overrides.update({"job_id": job + "." + casename}) if "total_tasks" in overrides and overrides["total_tasks"] > 1: input_text = input_text + cylc_batch_job_template( job, jobname, case, ensemble ) else: input_text = input_text + cylc_script_job_template( jobname, case, ensemble ) overrides.update( { "batchdirectives": env_batch.get_batch_directives( case, job, overrides=overrides, output_format="cylc" ) } ) # we need to re-transform for each job to get job size correctly input_text = transform_vars( input_text, case=case, subgroup=job, overrides=overrides ) with open("suite.rc", "w") as f: f.write(case.get_resolved_value(input_text)) if __name__ == "__main__": _main_func(__doc__)