blob: c0f314f500d84517ade80acd02aac5b89e33dc9d [file] [log] [blame]
# Copyright 2011 Google Inc. All Rights Reserved.
#
"""Classes that help running commands in a subshell.
Commands can be run locally, or remotly using SSH connection. You may log the
output of a command to a terminal or a file, or any other destination.
"""
__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
import fcntl
import logging
import os
import select
import subprocess
import time
from automation.common import logger
class CommandExecuter(object):
DRY_RUN = False
def __init__(self, dry_run=False):
self._logger = logging.getLogger(self.__class__.__name__)
self._dry_run = dry_run or self.DRY_RUN
@classmethod
def Configure(cls, dry_run):
cls.DRY_RUN = dry_run
def RunCommand(self,
cmd,
machine=None,
username=None,
command_terminator=None,
command_timeout=None):
cmd = str(cmd)
if self._dry_run:
return 0
if not command_terminator:
command_terminator = CommandTerminator()
if command_terminator.IsTerminated():
self._logger.warning('Command has been already terminated!')
return 1
# Rewrite command for remote execution.
if machine:
if username:
login = '%s@%s' % (username, machine)
else:
login = machine
self._logger.debug("Executing '%s' on %s.", cmd, login)
# FIXME(asharif): Remove this after crosbug.com/33007 is fixed.
cmd = "ssh -t -t %s -- '%s'" % (login, cmd)
else:
self._logger.debug("Executing: '%s'.", cmd)
child = self._SpawnProcess(cmd, command_terminator, command_timeout)
self._logger.debug('{PID: %d} Finished with %d code.', child.pid,
child.returncode)
return child.returncode
def _Terminate(self, child, command_timeout, wait_timeout=10):
"""Gracefully shutdown the child by sending SIGTERM."""
if command_timeout:
self._logger.warning('{PID: %d} Timeout of %s seconds reached since '
'process started.', child.pid, command_timeout)
self._logger.warning('{PID: %d} Terminating child.', child.pid)
try:
child.terminate()
except OSError:
pass
wait_started = time.time()
while not child.poll():
if time.time() - wait_started >= wait_timeout:
break
time.sleep(0.1)
return child.poll()
def _Kill(self, child):
"""Kill the child with immediate result."""
self._logger.warning('{PID: %d} Process still alive.', child.pid)
self._logger.warning('{PID: %d} Killing child.', child.pid)
child.kill()
child.wait()
def _SpawnProcess(self, cmd, command_terminator, command_timeout):
# Create a child process executing provided command.
child = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
shell=True)
# Close stdin so the child won't be able to block on read.
child.stdin.close()
started_time = time.time()
# Watch for data on process stdout, stderr.
pipes = [child.stdout, child.stderr]
# Put pipes into non-blocking mode.
for pipe in pipes:
fd = pipe.fileno()
fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK)
already_terminated = False
while pipes:
# Maybe timeout reached?
if command_timeout and time.time() - started_time > command_timeout:
command_terminator.Terminate()
# Check if terminate request was received.
if command_terminator.IsTerminated() and not already_terminated:
if not self._Terminate(child, command_timeout):
self._Kill(child)
# Don't exit the loop immediately. Firstly try to read everything that
# was left on stdout and stderr.
already_terminated = True
# Wait for pipes to become ready.
ready_pipes, _, _ = select.select(pipes, [], [], 0.1)
# Handle file descriptors ready to be read.
for pipe in ready_pipes:
fd = pipe.fileno()
data = os.read(fd, 4096)
# check for end-of-file
if not data:
pipes.remove(pipe)
continue
# read all data that's available
while data:
if pipe == child.stdout:
self.DataReceivedOnOutput(data)
elif pipe == child.stderr:
self.DataReceivedOnError(data)
try:
data = os.read(fd, 4096)
except OSError:
# terminate loop if EWOULDBLOCK (EAGAIN) is received
data = ''
if not already_terminated:
self._logger.debug('Waiting for command to finish.')
child.wait()
return child
def DataReceivedOnOutput(self, data):
"""Invoked when the child process wrote data to stdout."""
sys.stdout.write(data)
def DataReceivedOnError(self, data):
"""Invoked when the child process wrote data to stderr."""
sys.stderr.write(data)
class LoggingCommandExecuter(CommandExecuter):
def __init__(self, *args, **kwargs):
super(LoggingCommandExecuter, self).__init__(*args, **kwargs)
# Create a logger for command's stdout/stderr streams.
self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output'))
def OpenLog(self, log_path):
"""The messages are going to be saved to gzip compressed file."""
formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s',
'%Y-%m-%d %H:%M:%S')
handler = logger.CompressedFileHandler(log_path, delay=True)
handler.setFormatter(formatter)
self._output.addHandler(handler)
# Set a flag to prevent log records from being propagated up the logger
# hierarchy tree. We don't want for command output messages to appear in
# the main log.
self._output.propagate = 0
def CloseLog(self):
"""Remove handlers and reattach the logger to its parent."""
for handler in list(self._output.handlers):
self._output.removeHandler(handler)
handler.flush()
handler.close()
self._output.propagate = 1
def DataReceivedOnOutput(self, data):
"""Invoked when the child process wrote data to stdout."""
for line in data.splitlines():
self._output.info(line, extra={'prefix': 'STDOUT'})
def DataReceivedOnError(self, data):
"""Invoked when the child process wrote data to stderr."""
for line in data.splitlines():
self._output.warning(line, extra={'prefix': 'STDERR'})
class CommandTerminator(object):
def __init__(self):
self.terminated = False
def Terminate(self):
self.terminated = True
def IsTerminated(self):
return self.terminated