import errno
import fcntl
import os
import random
import select
import signal
import subprocess
import time
[docs]class CmdResult(object):
"""A class representing the result of a system call.
"""
def __init__(self, cmdline):
self.cmdline = cmdline
self.stdout = ""
self.stderr = ""
self.exit_code = None
self.exit_status = "undefined"
self.call_time = 0.0
def __str__(self):
s = ''
s += "command: %s\n" % self.cmdline
s += "stdout:\n%s\n" % self.stdout
s += "stderr:\n%s\n" % self.stderr
return s
[docs] def pprint(self):
"""
Print the command result in a pretty and colorful way.
"""
_, tty_w = subprocess.check_output(['stty', 'size']).split()
fmt_str = ('\033[94m%%-%ds\033[93m%%-3s\033[0m%4%.3f' %
(int(tty_w) - 16))
print(fmt_str % (
self.cmdline, self.exit_status, self.call_time))
for line in self.stdout.splitlines():
print(line)
for line in self.stderr.splitlines():
print('\033[91m%s\033[0m' % line)
[docs]def weighted_choice(choices):
total = sum(choice.weight for choice in choices)
rnd_num = random.uniform(0, total)
upto = 0
for choice in choices:
if upto + choice.weight > rnd_num:
return choice
upto += choice.weight
assert False, "Shouldn't get here"
[docs]def escape(org_str):
escapes = """~()[]{}<>|&$#?'"`*; \n\t\r\\"""
new_str = ""
for char in org_str:
if char in escapes:
new_str += '\\' + char
else:
new_str += char
return new_str
[docs]def pids():
return [pid for pid in os.listdir('/proc') if pid.isdigit()]
[docs]def kernel_pids():
results = []
for pid in pids():
try:
with open(os.path.join('/proc', pid, 'cmdline'), 'rb') as fp:
if not fp.read():
results.append(pid)
except IOError:
continue
return results
[docs]def run(cmdline, timeout=10):
"""Run the command line and return the result with a CmdResult object.
:param cmdline: The command line to run.
:type cmdline: str.
:param timeout: After which the calling processing is killed.
:type timeout: float.
:returns: CmdResult -- the command result.
:raises:
"""
start = time.time()
process = subprocess.Popen(
cmdline,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
preexec_fn=os.setsid,
)
fcntl.fcntl(
process.stdout,
fcntl.F_SETFL,
fcntl.fcntl(process.stdout, fcntl.F_GETFL) | os.O_NONBLOCK,
)
fcntl.fcntl(
process.stderr,
fcntl.F_SETFL,
fcntl.fcntl(process.stderr, fcntl.F_GETFL) | os.O_NONBLOCK,
)
result = CmdResult(cmdline)
try:
while True:
exit_code = process.poll()
result.call_time = (time.time() - start)
select.select([process.stdout, process.stderr], [], [], 0.1)
try:
out_lines = process.stdout.read()
if out_lines:
result.stdout += out_lines
err_lines = process.stderr.read()
if err_lines:
result.stderr += err_lines
except IOError as detail:
if detail.errno != errno.EAGAIN:
raise detail
if exit_code is not None:
result.exit_code = exit_code
if exit_code == 0:
result.exit_status = "success"
else:
result.exit_status = "failure"
return result
if result.call_time > timeout:
return result
finally:
if result.exit_code is None:
pgid = os.getpgid(process.pid)
os.killpg(pgid, signal.SIGKILL)
result.exit_status = "timeout"