Source code for ci.driver

# -*- coding: utf-8 -*-

"""This module provides an interface to parse and exectute commands found in
``scikit-ci.yml``."""

import errno
import json
import os
import os.path
import re
import ruamel.yaml
import shlex
import subprocess
import sys
import tempfile

from collections import MutableMapping
try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO

from pyfiglet import Figlet

from . import exceptions, utils
from .constants import SCIKIT_CI_CONFIG, SERVICES, STEPS


class DriverContext(object):
    def __init__(self, driver, env_file="env.json"):
        self.driver = driver
        self.env_file = env_file

    def __enter__(self):
        self.driver.load_env(self.env_file)
        self.env_file_modified_time = 0
        if os.path.exists(self.env_file):
            self.env_file_modified_time = os.path.getmtime(self.env_file)
        return self.driver

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None and exc_value is None and traceback is None:
            # Skip saving if env_file has been modified since context was entered
            current_env_file_modified_time = 0
            if os.path.exists(self.env_file):
                current_env_file_modified_time = os.path.getmtime(self.env_file)
            if current_env_file_modified_time == self.env_file_modified_time:
                self.driver.save_env(self.driver.env, self.env_file)
        self.driver.unload_env()


[docs]class Driver(object): def __init__(self): self.env = None self._env_file = None @staticmethod def log(*s): print(" ".join(s)) sys.stdout.flush() @staticmethod def read_env(env_file="env.json"): if not os.path.exists(env_file): return {} with open(env_file) as _file: return json.load(_file) def load_env(self, env_file="env.json"): if self.env is not None: self.unload_env() self.env = {} self.env.update(os.environ) self._env_file = env_file if os.path.exists(self._env_file): self.env.update(self.read_env(self._env_file)) self.env = {str(k): str(v) for k, v in self.env.items()} @staticmethod def save_env(env, env_file="env.json"): env = {str(k): str(v) for k, v in env.items()} with open(env_file, "w") as _file: json.dump(env, _file, indent=4) def unload_env(self): self.env = None if "COMSPEC" in os.environ: class GenericCommandConfig(object): shell = "cmd.exe" subprocess_shell_mode = True shell_options = ["/E:ON", "/V:ON", "/c"] use_script = True script_suffix = ".cmd" script_pre_code = "@echo off" script_post_code = "" @staticmethod def escape_cmd(cmd): return cmd.replace("%", "%%").replace("\\\\", "\\") @staticmethod def unescape_cmd(cmd): return cmd.replace("%", "%%").replace("\\", "\\\\") else: class GenericCommandConfig(object): shell = "bash" subprocess_shell_mode = False shell_options = [] use_script = True script_suffix = ".sh" script_pre_code = "set -e" script_post_code = "" @staticmethod def escape_cmd(cmd): return cmd @staticmethod def unescape_cmd(cmd): return cmd class PythonCommandConfig(object): shell = "python" subprocess_shell_mode = True shell_options = ["-B"] use_script = True script_suffix = ".py" script_pre_code = "" script_post_code = "" @staticmethod def escape_cmd(cmd): return cmd @staticmethod def unescape_cmd(cmd): return cmd @staticmethod def get_command_config(language): if language == "python": return Driver.PythonCommandConfig() else: return Driver.GenericCommandConfig() def check_call(self, *args, **kwds): kwds["env"] = kwds.get("env", self.env) cmd_config = kwds.pop("cmd_config") kwds["shell"] = cmd_config.subprocess_shell_mode cmd = cmd_config.escape_cmd(args[0]) if cmd_config.use_script: script = cmd script_lines = script.splitlines() if len(script_lines) == 1: self.log("[scikit-ci] Executing: %s" % cmd_config.unescape_cmd( script)) else: self.log("[scikit-ci] Executing:") prefix = " " * len("[scikit-ci] ") + " " for line in utils.indent(cmd_config.unescape_cmd(script), prefix).splitlines(): self.log(line) def _write(output_stream, txt): output_stream.write(bytearray("%s\n" % txt, "utf-8")) # Because of python issue #14243, we set "delete=False" and delete # manually after process execution. try: script_file = tempfile.NamedTemporaryFile( delete=False, suffix=cmd_config.script_suffix) # Pre-code _write(script_file, cmd_config.script_pre_code) # Content provided in the yml configuration files _write(script_file, script) # Post-code _write(script_file, cmd_config.script_post_code) script_file.file.flush() # Then, compose the command to execute shell_cmd = [cmd_config.shell] shell_cmd.extend(cmd_config.shell_options) shell_cmd.append(script_file.name) if cmd_config.subprocess_shell_mode: shell_cmd = " ".join(['"%s"' % arg for arg in shell_cmd]) args = [shell_cmd] # And finally execute subprocess.check_call(*args, **kwds) finally: script_file.close() os.remove(script_file.name) else: shell_cmd = [cmd_config.shell] if cmd_config.shell else [] shell_cmd.extend(cmd_config.shell_options) shell_cmd.append(cmd) args = [" ".join(shell_cmd)] self.log("[scikit-ci] Executing: %s" % args[0]) subprocess.check_call(*args, **kwds) def env_context(self, env_file="env.json"): return DriverContext(self, env_file) @staticmethod def expand_environment_vars(text, environment, to_empty_string=False): """Return an updated ``text`` string where all occurrences of ``$<EnvironmentVarName>`` found in ``environment`` are replaced. By default, occurrences of ``$<EnvironmentVarName>`` that are NOT associated with any environment variable are not replaced. Setting ``to_empty_string`` to True will change them to empty string. """ for name, value in environment.items(): text = text.replace( "$<%s>" % name, value.replace("\\", "\\\\").replace("\"", "\\\"")) if to_empty_string: text = re.sub(Driver.ENV_VAR_REGEX, "", text) return text ENV_VAR_REGEX = re.compile(r"\$<[\w\d][\w\d_]*>", re.IGNORECASE) """Regular expression matching legal environment variable of the form ``$<EnvironmentVarName>``""" @staticmethod def recursively_expand_environment_vars(step_env, global_env=None): """This function will recursively expand all occurrences of ``$<EnvironmentVarName>`` found in ``step_env`` and ``global_env`` values. """ if global_env is None: global_env = step_env # Keep track of variables that still need to be expanded to_be_expanded = set() def _expand(names, _work_env, _global_env=None): if _global_env is None: _global_env = _work_env for env_var_name in names: # Get the value env_var_value = _work_env[env_var_name] # Attempt to expand the value _work_env[env_var_name] = Driver.expand_environment_vars( env_var_value, _global_env) # Keep track of variable names to expand if re.match(Driver.ENV_VAR_REGEX, _work_env[env_var_name]): to_be_expanded.add(env_var_name) elif env_var_name in to_be_expanded: to_be_expanded.remove(env_var_name) # Expand step env values referencing global env variables _expand(step_env.keys(), step_env, global_env) global_env.update(step_env) # Expand variables _expand(step_env.keys(), global_env) # Expand remaining variables if any to_be_expanded_count = len(to_be_expanded) while to_be_expanded: _expand(to_be_expanded, global_env) if to_be_expanded_count == len(to_be_expanded): break to_be_expanded_count = len(to_be_expanded) # Update step environment for name in step_env: step_env[name] = global_env[name]
[docs] @staticmethod def expand_command(command, environments, posix_shell=True): """Return an updated ``command`` string where all occurrences of ``$<EnvironmentVarName>`` (with a corresponding env variable set) have been replaced. If ``posix_shell`` is True, only occurrences of ``$<EnvironmentVarName>`` in string starting with double quotes will be replaced. See https://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html and https://www.gnu.org/software/bash/manual/html_node/Single-Quotes.html """ if not posix_shell: return Driver.expand_environment_vars( command, environments, to_empty_string=True) # Strip line continuation characters. There are not required to # successfully evaluate the expression and are confusing shlex. command = command.replace("\\\n", "") def _indent_size(txt): count = 0 for char in txt: if char != ' ': break count += 1 return count expanded_lines = [] # Proceed to the expansion line by line for line in command.splitlines(): if line.startswith("#") or line == "": expanded_lines.append(line) continue indent = _indent_size(line) tokenizer = shlex.shlex(line, posix=False) tokenizer.whitespace_split = True expanded_tokens = [] for token in tokenizer: expand = not (token[0] == "'" and token[-1] == "'") if expand: token = Driver.expand_environment_vars( token, environments, to_empty_string=True) expanded_tokens.append(token) expanded_lines.append(indent * " " + " ".join(expanded_tokens)) return "\n".join(expanded_lines)
@staticmethod def _raise_if_setting_ci_name(environment): # Check if reserved variable are used if "CI_NAME" in environment: raise ValueError("CI_NAME environment variable can not be set. " "It is reserved to store the name of the current " "CI service (e.g appveyor, azure, circle or travis.") @staticmethod def parse_config(config_file, stage_name, service_name, global_env): with open(config_file) as input_stream: data = ruamel.yaml.load(input_stream, ruamel.yaml.RoundTripLoader) commands = [] environment = {} if stage_name in data: stage = data[stage_name] # common to all services environment = stage.get("environment", {}) commands = stage.get("commands", []) # Sanity checks Driver._raise_if_setting_ci_name(environment) # Expand all occurrences of ``$<EnvironmentVarName>``. Driver.recursively_expand_environment_vars(environment, global_env) if service_name in stage: system = stage[service_name] # consider service offering multiple operating system support if SERVICES[service_name]: operating_system = global_env[SERVICES[service_name]] system = system.get(operating_system, {}) # if any, get service specific environment system_environment = system.get("environment", {}) # Sanity checks Driver._raise_if_setting_ci_name(environment) # Expand system environment values Driver.recursively_expand_environment_vars( system_environment, global_env) # Merge system environment variable back into environment environment.update(system_environment) # ... and append commands commands += system.get("commands", []) return environment, commands def execute_commands(self, stage_name): print(Figlet().renderText(stage_name.upper())) service_name = utils.current_service() self.env["CI_NAME"] = service_name environment, commands = self.parse_config( SCIKIT_CI_CONFIG, stage_name, service_name, self.env) # Unescape environment variables for name in environment: value = self.env[name] for old, new in [("\\\\", "\\")]: value = value.replace(old, new) self.env[name] = value for cmd in commands: language = "default" if isinstance(cmd, MutableMapping): # Prevent output of debug message. # Workaround https://bitbucket.org/ruamel/yaml/pull-requests/13 try: oldout = sys.stdout sys.stdout = StringIO() language = list(cmd.keys())[0] cmd = list(cmd.values())[0] finally: sys.stdout = oldout else: # Expand environment variables used within commands posix_shell = "COMSPEC" not in os.environ cmd = self.expand_command( cmd, self.env, posix_shell=posix_shell).strip() try: self.check_call( cmd, cmd_config=self.get_command_config(language), env=self.env ) except subprocess.CalledProcessError as exc: raise exceptions.SKCIStepExecutionError( stage_name, exc.returncode, cmd, exc.output )
def dependent_steps(step): if step not in STEPS: # pragma: no cover raise KeyError("invalid step: {}".format(step)) step_index = STEPS.index(step) if step_index == 0: return [] return STEPS[0:step_index] def execute_step( step, force=False, with_dependencies=True, clear_cached_env=False): if not os.path.exists(SCIKIT_CI_CONFIG): # pragma: no cover raise OSError(errno.ENOENT, "Couldn't find %s" % SCIKIT_CI_CONFIG) if step not in STEPS: # pragma: no cover raise KeyError("invalid step: {}".format(step)) if clear_cached_env and os.path.exists('env.json'): os.remove('env.json') depends = dependent_steps(step) # If forcing execution, remove SCIKIT_CI_<step> env. variables if force: env = Driver.read_env() steps = [step] if with_dependencies: steps += depends for _step in steps: if 'SCIKIT_CI_%s' % _step.upper() in env: del env['SCIKIT_CI_%s' % _step.upper()] Driver.save_env(env) # Skip step if it has already been executed if 'SCIKIT_CI_%s' % step.upper() in Driver.read_env(): return # Recursively execute dependent steps if with_dependencies and depends: execute_step(depends[-1], with_dependencies=with_dependencies) d = Driver() with d.env_context(): d.execute_commands(step) d.env['SCIKIT_CI_%s' % step.upper()] = '1'