Source code for craftr.shell

# Copyright (C) 2016  Niklas Rosenstein
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
''' This module is similar to the `subprocess.run()` interface that is
available since Python 3.5 but is a bit customized so that it works
better with Craftr. '''

from shlex import split
from subprocess import PIPE, STDOUT

import os
import re
import shlex
import subprocess
import sys


class safe(str):
  ''' If this object is passed to `quote()`, it will not be escaped. '''
  pass


[docs]def quote(s): ''' Enhanced implementation for Windows systems as the original `shlex.quote()` function uses single-quotes on Windows which can lead to problems. ''' if isinstance(s, safe): return s if os.name == 'nt' and os.sep == '\\': s = s.replace('"', '\\"') if re.search('\s', s): s = '"' + s + '"' return s else: return shlex.quote(s)
class _ProcessError(Exception): ''' Base class that implements the attributes and behaviour of errors that will inherit from this exception class. ''' def __init__(self, process): self.process = process @property def returncode(self): return self.process.returncode @property def cmd(self): return self.process.cmd @property def stdout(self): return self.process.stdout @property def stderr(self): return self.process.stderr @property def output(self): return self.process.output class CalledProcessError(_ProcessError): ''' This exception is raised when a process exits with a non-zero returncode and the run was to be checked for such state. The exception contains the process information. ''' def __str__(self): return '{0!r} exited with non-zero exit-code {1}'.format(self.cmd, self.returncode) class TimeoutExpired(_ProcessError): ''' This exception is raised when a process did not exit after a specific timeout. If this exception was raised, the child process has already been killed. ''' def __init__(self, process, timeout): super().__init__(process) assert isinstance(timeout, (int, float)) self.timeout = timeout def __str__(self): return '{0!r} expired timeout of {1} second(s)'.format(self.cmd, self.timeout) class CompletedProcess(object): ''' This class represents a completed process. ''' __slots__ = 'cmd returncode stdout stderr'.split() def __init__(self, cmd, returncode, stdout, stderr): self.cmd = cmd self.returncode = returncode self.stdout = stdout self.stderr = stderr def __repr__(self): return '<CompletedProcess {0!r} with exit-code {1}>'.format(self.cmd, self.returncode) @property def output(self): return self.stdout @output.setter def output(self, value): self.stdout = value def decode(self, encoding): if encoding is None: return if self.stdout is not None: self.stdout = self.stdout.decode(encoding) if self.stderr is not None: self.stderr = self.stderr.decode(encoding) def check_returncode(self): if self.returncode != 0: raise CalledProcessError(self)
[docs]def run(cmd, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False, cwd=None, encoding=sys.getdefaultencoding()): ''' Run the process with the specified *cmd*. If *cmd* is a list of commands and *shell* is True, the list will be automatically converted to a properly escaped string for the shell to execute. Raises: CalledProcessError: If *check* is True and the process exited with a non-zero exit-code. TimeoutExpired: If *timeout* was specified and the process did not finish before the timeout expires. OSError: For some OS-level error, eg. if the program could not be found. ''' if shell and not isinstance(cmd, str): cmd = ' '.join(map(quote, cmd)) try: popen = subprocess.Popen( cmd, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell, cwd=cwd) stdout, stderr = popen.communicate(input, timeout) except subprocess.TimeoutExpired as exc: # TimeoutExpired.stderr available only since Python3.5 stderr = getattr(exc, 'stderr', None) process = CompletedProcess(exc.cmd, None, exc.output, stderr) process.decode(encoding) raise TimeoutExpired(process, timeout) except OSError as exc: if not exc.filename and os.name == 'nt': # Windows does not include the name of the file with which # the error occured in the exception message. if isinstance(cmd, str): program = split(cmd)[0] else: program = cmd[0] exc.filename = program raise process = CompletedProcess(cmd, popen.returncode, stdout, stderr) process.decode(encoding) if check: process.check_returncode() return process
[docs]def pipe(*args, merge=True, **kwargs): ''' Like `run()`, but pipes stdout and stderr to a buffer instead of directing them to the current standard out and error files. If *merge* is True, stderr will be merged into stdout. ''' kwargs.setdefault('stdout', PIPE) kwargs.setdefault('stderr', STDOUT if merge else PIPE) return run(*args, **kwargs)
__all__ = ['PIPE', 'STDOUT', 'split', 'quote', 'run', 'pipe']