"""
API for checking input for testcase
"""
from CIME.XML.standard_module_setup import *
from CIME.utils import SharedArea, find_files, safe_copy, expect
from CIME.XML.inputdata import Inputdata
import CIME.Servers
import glob, hashlib, shutil
logger = logging.getLogger(__name__)
# The inputdata_checksum.dat file will be read into this hash if it's available
chksum_hash = dict()
local_chksum_file = 'inputdata_checksum.dat'
def _download_checksum_file(rundir):
"""
Download the checksum files from each server and merge them into rundir.
"""
inputdata = Inputdata()
protocol = "svn"
# download and merge all available chksum files.
while protocol is not None:
protocol, address, user, passwd, chksum_file = inputdata.get_next_server()
if protocol not in vars(CIME.Servers):
logger.warning("Client protocol {} not enabled".format(protocol))
continue
logger.info("Using protocol {} with user {} and passwd {}".format(protocol, user, passwd))
if protocol == "svn":
server = CIME.Servers.SVN(address, user, passwd)
elif protocol == "gftp":
server = CIME.Servers.GridFTP(address, user, passwd)
elif protocol == "ftp":
server = CIME.Servers.FTP.ftp_login(address, user, passwd)
elif protocol == "wget":
server = CIME.Servers.WGET.wget_login(address, user, passwd)
else:
expect(False, "Unsupported inputdata protocol: {}".format(protocol))
if server is None:
continue
success = False
rel_path = chksum_file
full_path = os.path.join(rundir, local_chksum_file)
new_file = full_path + '.raw'
protocol = type(server).__name__
logging.info("Trying to download file: '{}' to path '{}' using {} protocol.".format(rel_path, new_file, protocol))
tmpfile = None
if os.path.isfile(full_path):
tmpfile = full_path+".tmp"
os.rename(full_path, tmpfile)
# Use umask to make sure files are group read/writable. As long as parent directories
# have +s, then everything should work.
with SharedArea():
success = server.getfile(rel_path, new_file)
if success:
_reformat_chksum_file(full_path, new_file)
if tmpfile:
_merge_chksum_files(full_path, tmpfile)
chksum_hash.clear()
else:
if tmpfile and os.path.isfile(tmpfile):
os.rename(tmpfile, full_path)
logger.warning("Could not automatically download file "+full_path+
" Restoring existing version.")
else:
logger.warning("Could not automatically download file {}".
format(full_path))
def _reformat_chksum_file(chksum_file, server_file):
"""
The checksum file on the server has 8 space seperated columns, I need the first and last ones.
This function gets the first and last column of server_file and saves it to chksum_file
"""
with open(server_file) as fd, open(chksum_file,"w") as fout:
lines = fd.readlines()
for line in lines:
lsplit = line.split()
if len(lsplit) < 8 or ' DIR ' in line:
continue
# remove the first directory ('inputdata/') from the filename
chksum = lsplit[0]
fname = (lsplit[7]).split('/',1)[1]
fout.write(" ".join((chksum, fname))+"\n")
os.remove(server_file)
def _merge_chksum_files(new_file, old_file):
"""
If more than one server checksum file is available, this merges the files and removes
any duplicate lines
"""
with open(old_file) as fin:
lines = fin.readlines()
with open(new_file) as fin:
lines += fin.readlines()
lines = set(lines)
with open(new_file, "w") as fout:
fout.write("".join(lines))
os.remove(old_file)
def _download_if_in_repo(server, input_data_root, rel_path, isdirectory=False):
"""
Return True if successfully downloaded
server is an object handle of type CIME.Servers
input_data_root is the local path to inputdata (DIN_LOC_ROOT)
rel_path is the path to the file or directory relative to input_data_root
user is the user name of the person running the script
isdirectory indicates that this is a directory download rather than a single file
"""
if not (rel_path or server.fileexists(rel_path)):
return False
full_path = os.path.join(input_data_root, rel_path)
logging.info("Trying to download file: '{}' to path '{}' using {} protocol.".format(rel_path, full_path, type(server).__name__))
# Make sure local path exists, create if it does not
if isdirectory or full_path.endswith(os.sep):
if not os.path.exists(full_path):
logger.info("Creating directory {}".format(full_path))
os.makedirs(full_path+".tmp")
isdirectory = True
elif not os.path.exists(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
# Use umask to make sure files are group read/writable. As long as parent directories
# have +s, then everything should work.
with SharedArea():
if isdirectory:
success = server.getdirectory(rel_path, full_path+".tmp")
# this is intended to prevent a race condition in which
# one case attempts to use a refdir before another one has
# completed the download
if success:
os.rename(full_path+".tmp",full_path)
else:
shutil.rmtree(full_path+".tmp")
else:
success = server.getfile(rel_path, full_path)
return success
def _downloadfromserver(case, input_data_root, data_list_dir):
"""
Download files
"""
success = False
protocol = 'svn'
inputdata = Inputdata()
if not input_data_root:
input_data_root = case.get_value('DIN_LOC_ROOT')
while not success and protocol is not None:
protocol, address, user, passwd, _ = inputdata.get_next_server()
logger.info("Checking server {} with protocol {}".format(address, protocol))
success = case.check_input_data(protocol=protocol, address=address, download=True,
input_data_root=input_data_root,
data_list_dir=data_list_dir,
user=user, passwd=passwd)
return success
[docs]def stage_refcase(self, input_data_root=None, data_list_dir=None):
"""
Get a REFCASE for a hybrid or branch run
This is the only case in which we are downloading an entire directory instead of
a single file at a time.
"""
get_refcase = self.get_value("GET_REFCASE")
run_type = self.get_value("RUN_TYPE")
continue_run = self.get_value("CONTINUE_RUN")
# We do not fully populate the inputdata directory on every
# machine and do not expect every user to download the 3TB+ of
# data in our inputdata repository. This code checks for the
# existence of inputdata in the local inputdata directory and
# attempts to download data from the server if it's needed and
# missing.
if get_refcase and run_type != "startup" and not continue_run:
din_loc_root = self.get_value("DIN_LOC_ROOT")
run_refdate = self.get_value("RUN_REFDATE")
run_refcase = self.get_value("RUN_REFCASE")
run_refdir = self.get_value("RUN_REFDIR")
rundir = self.get_value("RUNDIR")
if os.path.isabs(run_refdir):
refdir = run_refdir
expect(os.path.isdir(refdir), "Reference case directory {} does not exist or is not readable".format(refdir))
else:
refdir = os.path.join(din_loc_root, run_refdir, run_refcase, run_refdate)
if not os.path.isdir(refdir):
logger.warning("Refcase not found in {}, will attempt to download from inputdata".format(refdir))
with open(os.path.join("Buildconf","refcase.input_data_list"),"w") as fd:
fd.write("refdir = {}{}".format(refdir, os.sep))
if input_data_root is None:
input_data_root = din_loc_root
if data_list_dir is None:
data_list_dir = "Buildconf"
success = _downloadfromserver(self, input_data_root=input_data_root, data_list_dir=data_list_dir)
expect(success, "Could not download refcase from any server")
logger.info(" - Prestaging REFCASE ({}) to {}".format(refdir, rundir))
# prestage the reference case's files.
if (not os.path.exists(rundir)):
logger.debug("Creating run directory: {}".format(rundir))
os.makedirs(rundir)
rpointerfile = None
# copy the refcases' rpointer files to the run directory
for rpointerfile in glob.iglob(os.path.join("{}","*rpointer*").format(refdir)):
logger.info("Copy rpointer {}".format(rpointerfile))
safe_copy(rpointerfile, rundir)
expect(rpointerfile,"Reference case directory {} does not contain any rpointer files".format(refdir))
# link everything else
for rcfile in glob.iglob(os.path.join(refdir,"*")):
rcbaseline = os.path.basename(rcfile)
if not os.path.exists("{}/{}".format(rundir, rcbaseline)):
logger.info("Staging file {}".format(rcfile))
os.symlink(rcfile, "{}/{}".format(rundir, rcbaseline))
# Backward compatibility, some old refcases have cam2 in the name
# link to local cam file.
for cam2file in glob.iglob(os.path.join("{}","*.cam2.*").format(rundir)):
camfile = cam2file.replace("cam2", "cam")
os.symlink(cam2file, camfile)
elif not get_refcase and run_type != "startup":
logger.info("GET_REFCASE is false, the user is expected to stage the refcase to the run directory.")
if os.path.exists(os.path.join("Buildconf","refcase.input_data_list")):
os.remove(os.path.join("Buildconf","refcase.input_data_list"))
return True
[docs]def verify_chksum(input_data_root, rundir, filename, isdirectory):
"""
For file in filename perform a chksum and compare the result to that stored in
the local checksumfile, if isdirectory chksum all files in the directory of form *.*
"""
hashfile = os.path.join(rundir, local_chksum_file)
if not chksum_hash:
if not os.path.isfile(hashfile):
logger.warning("Failed to find or download file {}".format(hashfile))
return
with open(hashfile) as fd:
lines = fd.readlines()
for line in lines:
fchksum, fname = line.split()
if fname in chksum_hash:
expect(chksum_hash[fname] == fchksum, " Inconsistent hashes in chksum for file {}".format(fname))
else:
chksum_hash[fname] = fchksum
if isdirectory:
filenames = glob.glob(os.path.join(filename,"*.*"))
else:
filenames = [filename]
for fname in filenames:
if not os.sep in fname:
continue
chksum = md5(os.path.join(input_data_root, fname))
if chksum_hash:
if not fname in chksum_hash:
logger.warning("Did not find hash for file {} in chksum file {}".format(filename, hashfile))
else:
expect(chksum == chksum_hash[fname],
"chksum mismatch for file {} expected {} found {}".
format(os.path.join(input_data_root,fname),chksum, chksum_hash[fname]))
[docs]def md5(fname):
"""
performs an md5 sum one chunk at a time to avoid memory issues with large files.
"""
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()