Source code for themis.utils

# -*- coding: utf-8 -*-

"""
This module defines utility functions and classes.

The classes and functions may be used by multiple other modules within this package.
This module is not intended for use by client code.
"""

from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import os
import shutil
import signal
import glob
import warnings
import csv
import functools
import shlex

from themis.versions import Sequence, csvargs


DEFAULT_RUN_DIR_NAMES = "runs{sep}{{run_id}}".format(sep=os.sep)
DEFAULT_SETUP_DIR = os.path.join(os.curdir, ".themis_setup")
URI_ENV_VAR = "THEMIS_URI"
RUNID_ENV_VAR = "THEMIS_RUNID"
SETUPDIR_ENV_VAR = "THEMIS_SETUPDIR"
EXECDIR_ENV_VAR = "THEMIS_EXECDIR"
URI_SPLITCHAR = ":"


[docs]class Step(object): """Instances of this class represent the execution of one application. Instances do not do anything themselves; they are meant to be passed into other methods and functions. Many attributes map directly to ``lrun``, ``srun``, and ``flux mini run`` arguments. :param args: the application and its arguments, either as a list of strings or as a single string, e.g. ``"/bin/bash -c 'exit 1;'"`` or ``["/bin/bash", "-c", "exit 1;"]``. If a single string is passed, it will be converted to a list via ``shlex.split(args)``. The first element of the Sequence should be the *absolute* path to the application to execute. :type args: str or Sequence of str :param tasks: total MPI tasks. Corresponds to srun, lrun, and flux "-n" options. :type tasks: int, optional :param cores_per_task: cores per task. Corresponds to srun, lrun, and flux "-c" options. :type cores_per_task: int, optional :param gpus_per_task: gpus per task. Corresponds to lrun and flux "-g" option. :type gpus_per_task: int, optional :param timeout: the max running time, in minutes, for this run; if the limit is exceeded, the run will be killed. :type timeout: int, optional :param batch_script: a boolean indicating whether the application given by ``args[0]`` is a batch script. If it is, it will be parsed for tokens and copied into ``cwd`` before execution. :type batch_script: bool :param cwd: the working directory for the application given by ``args[0]``. If the path is relative, it is treated as relative the run directory for the owning run. By default, the CWD is the run directory. :type cwd: str """ __slots__ = ( "args", "tasks", "cores_per_task", "gpus_per_task", "timeout", "batch_script", "cwd", ) def __init__( # pylint: disable=too-many-arguments self, args, tasks=1, cores_per_task=1, gpus_per_task=0, timeout=0, batch_script=True, cwd=os.curdir, ): if isinstance(args, str): self.args = shlex.split(args) else: self.args = list(args) self.args[0] = os.path.abspath(self.args[0]) self.tasks = int(tasks) self.cores_per_task = int(cores_per_task) self.gpus_per_task = int(gpus_per_task) self.timeout = int(timeout) self.batch_script = bool(batch_script) self.cwd = cwd if isinstance(cwd, str) else str(cwd) def __repr__(self): """Return string representation of constructor call for this object""" return ( "{cls}(args={args}, tasks={tasks}, cores_per_task={cores}, " "gpus_per_task={gpus}, timeout={timeout}, batch_script={bscript}, " "cwd={cwd})" ).format( cls=type(self).__name__, args=self.args, tasks=self.tasks, cores=self.cores_per_task, gpus=self.gpus_per_task, timeout=self.timeout, bscript=self.batch_script, cwd=self.cwd, ) @staticmethod def encode(obj): """JSON Encoder for Step objects.""" if isinstance(obj, Step): return {key: getattr(obj, key) for key in obj.__slots__} raise TypeError()
[docs]class CompositeRun(object): # pylint: disable=too-few-public-methods """Instances of this class represent one run of a Themis ensemble. ``CompositeRun`` instances are the most general kind of run in an ensemble, and consist of two parts: a mapping defining arbitrary key-value pairs, and a sequence of ``Step`` objects. Instances do not do anything themselves; they are meant to be passed into other methods and functions. Many attributes map directly to ``lrun``, ``srun``, and ``flux mini run`` arguments. :param sample: a mapping from sample labels to values, such as one produced by iterating through a ``Samples`` object. Generally, sample labels (the keys of the mapping) should be constant across all runs. The sample will be used for parsing :ref:`text files <text_replacement>`. :type sample: Mapping :param steps: nonempty sequence of ``themis.Step`` objects defining the execution instructions for the run. Each ``Step`` object owned by the run will be executed in order: first ``steps[0]``, then ``steps[1]``, then ``steps[2]`` and so on. :type steps: Sequence """ __slots__ = ("sample", "steps") def __init__(self, sample, steps): self.sample = sample if sample is not None else {} self.steps = steps if not isinstance(steps, Sequence): raise TypeError( "Expected sequence type for steps, got {!r}".format( type(steps).__name__ ) ) if not steps: raise ValueError("len(steps) must be > 0")
[docs]class Run(CompositeRun): # pylint: disable=too-many-instance-attributes """Represents one simple run of a Themis ensemble. This class should be named ``SimpleRun`` but is not for backwards compatibility. The ``Run`` class is a restrictive, simplifying derivation of the ``CompositeRun`` class. ``Run`` instances support only a single ``Step``, i.e. only a single application makes up the run. Other limitations include: * All ``Run`` instances in an ensemble are assumed to share the same application. As a consequence, whether or not the application is a batch script is set on an ensemble-wide basis. * The working directory for each ``Run`` in an ensemble is assumed to be specified by the ``run_dir_names`` ensemble-wide constant. Instances do not do anything themselves; they are meant to be passed into other methods and functions. Many attributes map directly to ``lrun``, ``srun``, and ``flux mini run`` arguments. :param sample: a mapping from sample labels to values, such as one produced by iterating through a ``Samples`` object. Generally, sample labels (the keys of the mapping) should be constant across all runs. The sample will be used for parsing :ref:`text files <text_replacement>`. :param args: the command-line arguments to pass to the application. :type args: str, optional :param tasks: total MPI tasks. Corresponds to srun, lrun, and flux "-n" options. :type tasks: int, optional :param cores_per_task: cores per task. Corresponds to srun, lrun, and flux "-c" options. :type cores_per_task: int, optional :param gpus_per_task: gpus per task. Corresponds to lrun and flux "-g" option. :type gpus_per_task: int, optional :param timeout: the max running time, in minutes, for this run; if the limit is exceeded, the run will be killed. :type timeout: int, optional """ __slots__ = ( "sample", "args", "tasks", "cores_per_task", "gpus_per_task", "timeout", "application", "app_is_batch_script", "_steps", ) def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, sample=None, args=None, tasks=1, cores_per_task=1, gpus_per_task=0, timeout=0, ): self.sample = sample self.args = args self.tasks = tasks self.cores_per_task = cores_per_task self.gpus_per_task = gpus_per_task self.timeout = timeout self.application = None # set by Themis instance self.app_is_batch_script = True self._steps = None def __repr__(self): """Return string representation of constructor call for this object""" return ( "{}(sample={}, args={!r}, tasks={}, cores_per_task={}, " "gpus_per_task={}, timeout={})" ).format( type(self).__name__, self.sample, self.args, self.tasks, self.cores_per_task, self.gpus_per_task, self.timeout, ) @property def steps(self): """Dynamically construct the steps making up this run. A horrible hack, this requires that the owning themis.Themis set ``self.application`` so that ``args[0]`` can be the path to the application when the ``Step`` is constructed. """ if self._steps is not None: return self._steps arguments = [self.application] if self.args is not None: arguments.extend(shlex.split(self.args)) return ( Step( arguments, self.tasks, self.cores_per_task, self.gpus_per_task, self.timeout, self.app_is_batch_script, ), ) @steps.setter def steps(self, new_steps): self._steps = new_steps
class AugmentedRun(CompositeRun): # pylint: disable=too-few-public-methods """A Run object used by Themis's backend with additional fields.""" __slots__ = CompositeRun.__slots__ + ("status", "result") def __init__(self, *args, **kwargs): self.status = kwargs.pop("status", None) self.result = kwargs.pop("result", None) super(AugmentedRun, self).__init__(*args, **kwargs) def convert_none(value, none_replacement): """If value is None, return the replacement value.""" if value is None: return none_replacement return value def validate_application(application): """Validate the application; raise a ValueError if it isn't found. If application isn't an absolute or relative path, search for it along the PATH environment variable. """ if not os.path.exists(application): found_application = which(application) if found_application is None: raise ValueError( "{} is not a valid application: it either does not exist" " or is not executable.".format(application) ) return found_application if not os.path.isfile(application) or not os.access(application, os.X_OK): raise ValueError( "{} is not a valid application: it either does not exist" " or is not executable.".format(application) ) return os.path.abspath(application) def which(program): """Search for `program` along the PATH environment variable.""" try: # works in Python 3; not in 2 return shutil.which(program) except AttributeError: pass path_var = os.getenv("PATH") for path_entry in path_var.split(os.path.pathsep): potential_match = os.path.join(path_entry, program) if os.path.isfile(potential_match) and os.access(potential_match, os.X_OK): return potential_match return None def existing_file(file_path): """Check that `file_path` exists; raise a ValueError if it doesn't.""" if not os.path.isfile(file_path): if os.path.isdir(file_path): raise ValueError( "{!r} is a directory, but a file is required".format(file_path) ) raise ValueError("{!r} is not an existing file".format(file_path)) return os.path.abspath(file_path) def match_path(path_pattern, no_dirs): """Return files matching `path_pattern`; raise a ValueError if no matches.""" if no_dirs: path_func = existing_file else: path_func = os.path.abspath pattern_matched = glob.glob(path_pattern) if not pattern_matched: raise ValueError("{!r} does not match anything.".format(path_pattern)) return [path_func(match) for match in pattern_matched] def sequence_of_path_patterns(path_patterns, no_dirs=False): """Expand an iterable of unix-style path patterns into a sequence of actual paths. :param no_dirs: if `True`, raise an error if any of the matches is a directory :raises ValueError: if any of the path patterns match no files. """ if path_patterns is None: path_patterns = () elif isinstance(path_patterns, str): path_patterns = (path_patterns,) expanded_paths = set() for path in path_patterns: for match in match_path(path, no_dirs): expanded_paths.add(match) return tuple(expanded_paths) def validate_run_files(input_decks, run_copy, run_symlink): """Validate the 3 sets of files/paths.""" input_decks = set(sequence_of_path_patterns(input_decks, True)) run_copy = set(sequence_of_path_patterns(run_copy)) run_symlink = set(sequence_of_path_patterns(run_symlink)) remove_overlap( input_decks, "run_parse", (run_copy, "run_copy"), (run_symlink, "run_symlink") ) remove_overlap(run_copy, "run_copy", (run_symlink, "run_symlink")) return (list(input_decks), list(run_copy), list(run_symlink)) def remove_overlap(high_priority_set, high_priority_name, *lower_priority_tuples): """Remove the overlap (intersection) between two or more sets.""" for low_priority_set, low_priority_name in lower_priority_tuples: if high_priority_set & low_priority_set: warnings.warn( "\n{high_priority} and {low_priority} should be distinct, but:\n" "{overlap}\nare common to both. " "Giving priority to {high_priority}...".format( high_priority=high_priority_name, low_priority=low_priority_name, overlap="\n".join(high_priority_set & low_priority_set), ), stacklevel=3, ) low_priority_set -= high_priority_set def type_check(value, *types_to_check): """Raise a ValueError if `value` isn't one of `types_to_check`""" if not isinstance(value, types_to_check): raise TypeError( "Expected a type in {}, got {}".format(types_to_check, type(value)) ) return value def range_check(value, min_val=None, max_val=None, name="value"): """Check that a variable falls within a range, then return it. :param min_val: the minimum acceptable value, or None for no minimum. :param min_val: the maximum acceptable value, or None for no maximum. :param name: the name of the variable to check. Used for error messages. """ if min_val is not None and value < min_val: raise ValueError("{} should be at least {}".format(name, min_val)) if max_val is not None and value > max_val: raise ValueError("{} should be at most {}".format(name, max_val)) return value class DirectoryManager(object): """Context manager for changing the working directory.""" def __init__(self, new_directory): """Constructor. Saves the name of the directory to change to.""" self.new_directory = new_directory self.old_directory = os.getcwd() def __enter__(self): """Change to the new working directory.""" os.chdir(self.new_directory) return self def __exit__(self, exc_type, exc_value, traceback): """Return to the original working directory. Return False, so the interpreter will re-raise any exceptions in the managed block. """ os.chdir(self.old_directory) return False def print_progress_bar( iteration, total, suffix="", decimals=1, length=100, ): """ Create a terminal progress bar :param iteration: current iteration (int) :param total: total iterations (int) :param suffix: suffix for the progress bar :param decimals: positive number of decimals in percent complete (int) :param length: character length of bar (int) """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filled_length = int(length * iteration // total) for fill_char in ("█", "#"): # use an ascii character for ascii-only consoles progress_bar = fill_char * filled_length + "-" * (length - filled_length) try: print( "|{}| {}% {} ({}/{})".format( progress_bar, percent, suffix, iteration, total ) ) except UnicodeEncodeError: pass else: break def import_app_interface(app_interface_path): """Import the user's application interface module and return it. Don't suppress any exceptions raised by the import. """ if app_interface_path is not None: module_name = ( "app_interface_" + os.path.splitext(os.path.basename(app_interface_path))[0] ) try: import importlib.util except ImportError: import imp app_interface = imp.load_source(module_name, app_interface_path) else: module_spec = importlib.util.spec_from_file_location( module_name, app_interface_path ) app_interface = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(app_interface) else: app_interface = None return app_interface def get_run_dir(run_id, run_dir_names, sample): """Return the run directory for the run given by run_id. The default run directory is generated by grouping runs into slots of 10,000 and then naming the run directory by its run_id. :param run_dir_names: A format string that, when formatted by sample, should yield the run directory, or None to use the default run directory. :param setup_dir: a directory path, which the run directory is relative to :param sample: the sample used to format run_dir_names, if run_dir_names is not None """ run_min = (run_id // 10000) * 10000 run_max = ((run_id // 10000) + 1) * 10000 - 1 return run_dir_names.format( run_id=run_id, run_id_min=run_min, run_id_max=run_max, **sample ) def _validate_run_dirs(samples, run_dir_names): """Check that the samples will produce unique run directories.""" set_of_run_dir_names = set() for sham_run_id, sample in enumerate(samples): try: set_of_run_dir_names.add(get_run_dir(sham_run_id, run_dir_names, sample)) except KeyError: raise ValueError( "The samples do not have keys matching the run_dir_names format string" ) if len(samples) > len(set_of_run_dir_names): raise ValueError( "That combination of samples and run_dir_names will " "not produce unique run directories." ) def validate_runs(runs, run_dir_names=None, application=None, batch_script=True): """Validate the type and value of runs and return standardized form.""" run_dir_names = DEFAULT_RUN_DIR_NAMES if run_dir_names is None else run_dir_names runs = list(runs) if not isinstance(runs, Sequence) else runs for run in runs: # add missing data to the outdated ``Run`` class if isinstance(run, Run): run.application = application run.app_is_batch_script = batch_script # validate a reasonable number of run dirs _validate_run_dirs([run.sample for run in runs[:5000]], run_dir_names) return runs def validate_samples(samples, run_dir_names=None, check_nonempty=False): """Validate the type and value of samples and return standardized form.""" run_dir_names = DEFAULT_RUN_DIR_NAMES if run_dir_names is None else run_dir_names samples = list(samples) if not isinstance(samples, Sequence) else samples if not samples and check_nonempty: raise ValueError("There are no samples") _validate_run_dirs(samples[:5000], run_dir_names) return samples def get_application(application, run_dir, app_is_batch_script): """Get the path to the application to the application to execute. Usually nothing needs to be done. However, if the application is a batch script, it was parsed and copied into the run directory. """ if app_is_batch_script: return os.path.join(run_dir, os.path.basename(application)) return application class CleanDirectory(object): """Context manager for changing the working directory to a new, clean one.""" def __init__(self, new_directory): """Constructor. Saves the name of the directory to change to.""" self.new_directory = new_directory self.old_directory = os.getcwd() def go_to_new(self): """Create or clear the new directory, and change into it.""" if os.path.exists(self.new_directory): shutil.rmtree(self.new_directory, ignore_errors=True) if not os.path.exists(self.new_directory): os.mkdir(self.new_directory) os.chdir(self.new_directory) return self.old_directory def go_to_old(self): """Return to the old working directory.""" os.chdir(self.old_directory) def __enter__(self): """Change to the new working directory and empty it. Return the path of the original working directory. """ return self.go_to_new() def __exit__(self, exc_type, exc_value, traceback): """Return to the original working directory. Return False, so the interpreter will re-raise any exceptions in the managed block. """ self.go_to_old() return False def clean_directory_decorator(new_directory=None): """Wrap a function in a CleanDirectory context. If the name of a new directory is not given, use the name of the passed function. """ def decorator(func): dir_context = new_directory or func.__name__ @functools.wraps(func) def nested(*args, **kwargs): with CleanDirectory(dir_context): return func(*args, **kwargs) return nested return decorator def _alarm_exception(signum, frame): """Signal handler function.""" raise OSError("Alarm handler called") def timeout_decorator(timeout): """Raise a TimeoutError if a function takes more than `timeout` seconds.""" def decorator(func): """Wrap a function in calls to SIGALRM.""" @functools.wraps(func) def nested(*args, **kwargs): """Request an alarm, call a function, disable the alarm.""" signal.signal(signal.SIGALRM, _alarm_exception) signal.alarm(timeout) try: return_val = func(*args, **kwargs) finally: signal.alarm(0) # disable the alarm, the function finished return return_val return nested return decorator def add_parameterfile_arg(parser, nargs=None): """Add a parameterfile argument to an `argparse.ArgumentParser`.""" parser.add_argument( "parameterfile", type=str, help=( "Path to the file defining the parameters for each run." " The file is assumed to be a CSV with a header row." ), nargs=nargs, ) def add_common_creation_args(parser): """Add argparse.ArgumentParser arguments common to __main__.py and laf.py""" parser.add_argument( "-r", "--run-dir-names", type=str, help=( "The Python format string yielding the run directories. " "Default behavior is to put each run into `runs/####`." ), metavar="FORMATSTRING", ) parser.add_argument( "--parse", nargs="+", type=str, help=( "Paths to text files to parse and copy into each run directory. " "The files are parsed for variables declared with '%%%%'. " "For example, '%%%%my_sample_1%%%%' will be replaced with the value " "of 'my_sample_1' for each run." ), metavar="FILE", dest="run_parse", ) parser.add_argument( "--symlink", nargs="+", type=str, help=( "Paths to files/directories to symlink into each run directory. " "E.g. `--symlink *` will symlink everything in the current working " "directory. Default behavior is not to symlink anything. " ), metavar="PATH", dest="run_symlink", ) parser.add_argument( "--copy", nargs="+", type=str, help=( "Paths to files/directories to hard-copy into each run directory. " "E.g. `--copy *` will copy everything in the current working directory. " "Default behavior is not to copy anything. " ), metavar="PATH", dest="run_copy", ) def read_csv(path): """Read a CSV and return an iterable of dictionaries representing the rows.""" with open(path, **csvargs) as csvfile: reader = csv.DictReader(csvfile) for row in reader: yield row