подпроцесс python с тайм-аутом и большим выходом (> 64K)

Я хочу выполнить процесс, ограничить время выполнения некоторым тайм-аутом в секундах и захватить вывод, полученный процессом. И я хочу сделать это на windows, linux и freebsd.

Я попытался реализовать это тремя способами:

  1. cmd – без тайм-аута и subprocess.PIPE для вывода.

    ПОВЕДЕНИЕ: работает, как ожидалось, но не поддерживает тайм-аут, мне нужен тайм-аут …

  2. cmd_to – с тайм-аутом и подпроцессом.PIPE для вывода.

    BEHAVIOR: Блокирует выполнение подпроцесса при выходе> = 2 ^ 16 байт.

  3. cmd_totf – с тайм-аутом и tempfile.NamedTemporaryfile для вывода.

    BEHAVIOR: работает как ожидалось, но использует временные файлы на диске.

Они доступны ниже для более тщательного осмотра.

Как видно из приведенного ниже результата, тайм-аут блокирует выполнение подпроцесса при использовании подпроцесса. PIPE и вывод из подпроцесса> = 2 ^ 16 байт.

В документации к подпроцессу указано, что это ожидается при вызове process.wait () и использовании subprocessing.PIPE, однако при использовании process.poll () не выдаются никаких предупреждений, поэтому что здесь происходит не так?

У меня есть решение в cmd_totf, которое использует модуль tempfile, но компромисс заключается в том, что он записывает вывод на диск, что я бы ДЕЙСТВИТЕЛЬНО хотел избежать.

Поэтому мои вопросы:

  • Что я делаю неправильно в cmd_to?
  • Есть ли способ сделать то, что я хочу, и не использовать tempfiles / сохранение вывода в памяти.

Скрипт для создания кучи вывода ('exp_gen.py'):

#!/usr/bin/env python import sys output = "b"*int(sys.argv[1]) print output 

Три различные реализации (cmd, cmd_to, cmd_totf) оберток вокруг подпроцесса. Открытие:

 #!/usr/bin/env python import subprocess, time, tempfile bufsize = -1 def cmd(cmdline, timeout=60): """ Execute cmdline. Uses subprocessing and subprocess.PIPE. """ p = subprocess.Popen( cmdline, bufsize = bufsize, shell = False, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE ) out, err = p.communicate() returncode = p.returncode return (returncode, err, out) def cmd_to(cmdline, timeout=60): """ Execute cmdline, limit execution time to 'timeout' seconds. Uses subprocessing and subprocess.PIPE. """ p = subprocess.Popen( cmdline, bufsize = bufsize, shell = False, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE ) t_begin = time.time() # Monitor execution time seconds_passed = 0 while p.poll() is None and seconds_passed < timeout: seconds_passed = time.time() - t_begin time.sleep(0.1) #if seconds_passed > timeout: # # try: # p.stdout.close() # If they are not closed the fds will hang around until # p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception # p.terminate() # Important to close the fds prior to terminating the process! # # NOTE: Are there any other "non-freed" resources? # except: # pass # # raise TimeoutInterrupt out, err = p.communicate() returncode = p.returncode return (returncode, err, out) def cmd_totf(cmdline, timeout=60): """ Execute cmdline, limit execution time to 'timeout' seconds. Uses subprocessing and tempfile instead of subprocessing.PIPE. """ output = tempfile.NamedTemporaryFile(delete=False) error = tempfile.NamedTemporaryFile(delete=False) p = subprocess.Popen( cmdline, bufsize = 0, shell = False, stdin = None, stdout = output, stderr = error ) t_begin = time.time() # Monitor execution time seconds_passed = 0 while p.poll() is None and seconds_passed < timeout: seconds_passed = time.time() - t_begin time.sleep(0.1) #if seconds_passed > timeout: # # try: # p.stdout.close() # If they are not closed the fds will hang around until # p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception # p.terminate() # Important to close the fds prior to terminating the process! # # NOTE: Are there any other "non-freed" resources? # except: # pass # # raise TimeoutInterrupt p.wait() returncode = p.returncode fd = open(output.name) out = fd.read() fd.close() fd = open(error.name) err = fd.read() fd.close() error.close() output.close() return (returncode, err, out) if __name__ == "__main__": implementations = [cmd, cmd_to, cmd_totf] bytes = ['65535', '65536', str(1024*1024)] timeouts = [5] for timeout in timeouts: for size in bytes: for i in implementations: t_begin = time.time() seconds_passed = 0 rc, err, output = i(['exp_gen.py', size], timeout) seconds_passed = time.time() - t_begin filler = ' '*(8-len(i.func_name)) print "[%s%s: timeout=%d, iosize=%s, seconds=%f]" % (repr(i.func_name), filler, timeout, size, seconds_passed) 

Результат выполнения:

 ['cmd' : timeout=5, iosize=65535, seconds=0.016447] ['cmd_to' : timeout=5, iosize=65535, seconds=0.103022] ['cmd_totf': timeout=5, iosize=65535, seconds=0.107176] ['cmd' : timeout=5, iosize=65536, seconds=0.028105] ['cmd_to' : timeout=5, iosize=65536, seconds=5.116658] ['cmd_totf': timeout=5, iosize=65536, seconds=0.104905] ['cmd' : timeout=5, iosize=1048576, seconds=0.025964] ['cmd_to' : timeout=5, iosize=1048576, seconds=5.128062] ['cmd_totf': timeout=5, iosize=1048576, seconds=0.103183] 

2 Solutions collect form web for “подпроцесс python с тайм-аутом и большим выходом (> 64K)”

В отличие от всех предупреждений в документации к подпроцессу, тогда прямое чтение из process.stdout и process.stderr обеспечило лучшее решение.

К лучшему я имею в виду, что я могу читать результат процесса, который превышает 2 ^ 16 байтов, без необходимости временного хранения вывода на диске.

Код выглядит следующим образом:

 import fcntl import os import subprocess import time def nonBlockRead(output): fd = output.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) try: return output.read() except: return '' def cmd(cmdline, timeout=60): """ Execute cmdline, limit execution time to 'timeout' seconds. Uses the subprocess module and subprocess.PIPE. Raises TimeoutInterrupt """ p = subprocess.Popen( cmdline, bufsize = bufsize, # default value of 0 (unbuffered) is best shell = False, # not really needed; it's disabled by default stdout = subprocess.PIPE, stderr = subprocess.PIPE ) t_begin = time.time() # Monitor execution time seconds_passed = 0 stdout = '' stderr = '' while p.poll() is None and seconds_passed < timeout: # Monitor process time.sleep(0.1) # Wait a little seconds_passed = time.time() - t_begin # p.std* blocks on read(), which messes up the timeout timer. # To fix this, we use a nonblocking read() # Note: Not sure if this is Windows compatible stdout += nonBlockRead(p.stdout) stderr += nonBlockRead(p.stderr) if seconds_passed >= timeout: try: p.stdout.close() # If they are not closed the fds will hang around until p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception p.terminate() # Important to close the fds prior to terminating the process! # NOTE: Are there any other "non-freed" resources? except: pass raise TimeoutInterrupt returncode = p.returncode return (returncode, stdout, stderr) 

Отказ от ответственности: этот ответ не проверяется на windows, а также freebsd. Но используемые модули должны работать над этими системами. Я считаю, что это должен быть рабочий ответ на ваш вопрос – он работает для меня.

Вот код, который я только что взломал, чтобы решить проблему на Linux. Это комбинация нескольких потоков Stackoverflow и моих собственных исследований в документах Python 3.

Основные характеристики этого кода:

  • Использует процессы не потоки для блокировки ввода-вывода, потому что они могут более надежно быть p.terminated ()
  • Реализует перезагружаемый контрольный тайм-аут, который перезапускает подсчет, когда происходит какой-то вывод
  • Внедряет долгосрочный контрольный тайм-аут для ограничения общей продолжительности выполнения
  • Может кормить в stdin (хотя мне нужно только кормить в однократных коротких строках)
  • Может захватывать stdout / stderr обычным способом Popen (только stdout кодируется и stderr перенаправляется на stdout, но может быть легко разделен)
  • Это почти в реальном времени, потому что он проверяет только каждые 0,2 секунды для вывода. Но вы можете уменьшить это или легко удалить интервал ожидания
  • Многие отладочные распечатки по-прежнему позволяют видеть, что происходит.

Единственная зависимость кода – это перечисление, как это реализовано здесь , но код можно легко изменить без работы. Он используется только для выделения двух тайм-аутов – используйте отдельные исключения, если хотите.

Вот код – как обычно – обратная связь очень приветствуется: (Edit 29-Jun-2012 – код сейчас работает)

 # Python module runcmd # Implements a class to launch shell commands which # are killed after a timeout. Timeouts can be reset # after each line of output # # Use inside other script with: # # import runcmd # (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'], # timeout_runtime, # timeout_no_output, # stdin_string).go() # import multiprocessing import queue import subprocess import time import enum def timestamp(): return time.strftime('%Y%m%d-%H%M%S') class ErrorRunCmd(Exception): pass class ErrorRunCmdTimeOut(ErrorRunCmd): pass class Enqueue_output(multiprocessing.Process): def __init__(self, out, queue): multiprocessing.Process.__init__(self) self.out = out self.queue = queue self.daemon = True def run(self): try: for line in iter(self.out.readline, b''): #print('worker read:', line) self.queue.put(line) except ValueError: pass # Readline of closed file self.out.close() class Enqueue_input(multiprocessing.Process): def __init__(self, inp, iterable): multiprocessing.Process.__init__(self) self.inp = inp self.iterable = iterable self.daemon = True def run(self): #print("writing stdin") for line in self.iterable: self.inp.write(bytes(line,'utf-8')) self.inp.close() #print("writing stdin DONE") class RunCmd(): """RunCmd - class to launch shell commands Captures and returns stdout. Kills child after a given amount (timeout_runtime) wallclock seconds. Can also kill after timeout_retriggerable wallclock seconds. This second timer is reset whenever the child does some output (return_code, out) = RunCmd(['ls', '-l', '/etc'], timeout_runtime, timeout_no_output, stdin_string).go() """ Timeout = enum.Enum('No','Retriggerable','Runtime') def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None): self.dbg = False self.cmd = cmd self.timeout_retriggerable = timeout_retriggerable self.timeout_runtime = timeout_runtime self.timeout_hit = self.Timeout.No self.stdout = '--Cmd did not yield any output--' self.stdin = stdin def read_queue(self, q): time_last_output = None try: bstr = q.get(False) # non-blocking if self.dbg: print('{} chars read'.format(len(bstr))) time_last_output = time.time() self.stdout += bstr except queue.Empty: #print('queue empty') pass return time_last_output def go(self): if self.stdin: pstdin = subprocess.PIPE else: pstdin = None p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin) pin = None if (pstdin): pin = Enqueue_input(p.stdin, [self.stdin + '\n']) pin.start() q = multiprocessing.Queue() pout = Enqueue_output(p.stdout, q) pout.start() try: if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime())) time_begin = time.time() time_last_output = time_begin seconds_passed = 0 self.stdout = b'' once = True # ensure loop's executed at least once # some child cmds may exit very fast, but still produce output while once or p.poll() is None or not q.empty(): once = False if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout))) tlo = self.read_queue(q) if tlo: time_last_output = tlo now = time.time() if now - time_last_output >= self.timeout_retriggerable: self.timeout_hit = self.Timeout.Retriggerable raise ErrorRunCmdTimeOut(self) if now - time_begin >= self.timeout_runtime: self.timeout_hit = self.Timeout.Runtime raise ErrorRunCmdTimeOut(self) if q.empty(): time.sleep(0.1) # Final try to get "last-millisecond" output self.read_queue(q) finally: self._close(p, [pout, pin]) return (self.returncode, self.stdout) def _close(self, p, procs): if self.dbg: if self.timeout_hit != self.Timeout.No: print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit)) else: print('{} No timeout occured'.format(timestamp())) for process in [proc for proc in procs if proc]: try: process.terminate() except: print('{} Process termination raised trouble'.format(timestamp())) raise try: p.stdin.close() except: pass if self.dbg: print('{} _closed stdin'.format(timestamp())) try: p.stdout.close() # If they are not closed the fds will hang around until except: pass if self.dbg: print('{} _closed stdout'.format(timestamp())) #p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception try: p.terminate() # Important to close the fds prior to terminating the process! # NOTE: Are there any other "non-freed" resources? except: pass if self.dbg: print('{} _closed Popen'.format(timestamp())) try: self.stdout = self.stdout.decode('utf-8') except: pass self.returncode = p.returncode if self.dbg: print('{} _closed all'.format(timestamp())) 

Использовать с:

 import runcmd cmd = ['ls', '-l', '/etc'] worker = runcmd.RunCmd(cmd, 40, # limit runtime [wallclock seconds] 2, # limit runtime after last output [wallclk secs] '' # stdin input string ) (return_code, out) = worker.go() if worker.timeout_hit != worker.Timeout.No: print('A TIMEOUT occured: {}'.format(worker.timeout_hit)) else: print('No timeout occured') print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out))) print('Output:') print(out) 

command – первый аргумент – должен быть списком команды и ее аргументов. Он используется для вызова Popen(shell=False) а его таймауты – в секундах. В настоящее время нет кода для отключения тайм-аутов. Установите time_runtime в time_runtime чтобы эффективно отключить time_runtime . stdin_string может быть любой строкой, которая должна быть отправлена ​​на стандартный ввод команды. Установите значение « None если ваша команда не требует ввода. Если строка предоставлена, добавляется окончательный '\ n'.

Python - лучший язык программирования в мире.