Как вставить несколько экземпляров подпроцессов в Python 2.7?

У меня есть три команды, которые в противном случае легко соединялись бы в командной строке следующим образом:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput 

Другими словами, firstCommand обрабатывает foo от стандартного ввода и secondCommand результат на secondCommand , который, в свою очередь, обрабатывает этот вход и выводит его вывод на thirdCommand , который обрабатывает и перенаправляет свой вывод в файл finalOutput .

Я пытаюсь воспроизвести это в скрипте Python, используя threading. Я бы хотел использовать Python, чтобы манипулировать firstCommand из firstCommand прежде чем передавать его во secondCommand , и снова между secondCommand и thirdCommand .

Вот фрагмент кода, который, похоже, не работает:

 first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout) first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin)) second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin)) third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin)) first_thread.start() second_thread.start() third_thread.start() first_thread.join() second_thread.join() third_thread.join() first_process.communicate() second_process.communicate() third_process.communicate() # read 1K chunks from standard input def consumeOutputFromStdin(from_stream, to_stream): chunk = from_stream.read(1024) while chunk: to_stream.write(chunk) to_stream.flush() chunk = from_stream.read(1024) def consumeOutputFromFirstCommand(from_stream, to_stream): while True: unprocessed_line = from_stream.readline() if not unprocessed_line: break processed_line = some_python_function_that_processes_line(unprocessed_line) to_stream.write(processed_line) to_stream.flush() def consumeOutputFromSecondCommand(from_stream, to_stream): while True: unprocessed_line = from_stream.readline() if not unprocessed_line: break processed_line = a_different_python_function_that_processes_line(unprocessed_line) to_stream.write(processed_line) to_stream.flush() 

Когда я запускаю это, скрипт зависает:

 $ echo foo | ./myConversionScript.py ** hangs here... ** 

Если я нажму Ctrl-C чтобы завершить скрипт, код застрял в строке third_thread.join() :

  Cc Cc Traceback (most recent call last): File "./myConversionScript.py", line 786, in <module> sys.exit(main(*sys.argv)) File "./myConversionScript.py", line 556, in main third_thread.join() File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join self.__block.wait() File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait waiter.acquire() KeyboardInterrupt 

Если я не использую third_process и third_thread , вместо этого third_thread только данные с выхода первого потока на вход второго потока, нет зависания.

Что-то о третьем потоке, кажется, заставляет вещи сломаться, но я не знаю, почему.

Я думал, что точка communicate() заключается в том, что она будет обрабатывать операции ввода-вывода для трех процессов, поэтому я не уверен, почему происходит зависание ввода-вывода.

Как получить три или более команды / процессы, работающие вместе, где один поток потребляет результат другого потока / процесса?

ОБНОВИТЬ

Хорошо, я внес некоторые изменения, которые, похоже, помогают, основываясь на некоторых комментариях здесь и на других сайтах. Процессы выполняются для wait() для завершения, и в методах потоков я close() каналы, когда поток обрабатывает все данные, которые он может. Меня беспокоит то, что использование больших объемов памяти для больших наборов данных будет очень высоким, но, по крайней мере, все работает:

 first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout) first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin)) second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin)) third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin)) first_thread.start() second_thread.start() third_thread.start() first_thread.join() second_thread.join() third_thread.join() first_process.wait() second_process.wait() third_process.wait() # read 1K chunks from standard input def consumeOutputFromStdin(from_stream, to_stream): chunk = from_stream.read(1024) while chunk: to_stream.write(chunk) to_stream.flush() chunk = from_stream.read(1024) def consumeOutputFromFirstCommand(from_stream, to_stream): while True: unprocessed_line = from_stream.readline() if not unprocessed_line: from_stream.close() to_stream.close() break processed_line = some_python_function_that_processes_line(unprocessed_line) to_stream.write(processed_line) to_stream.flush() def consumeOutputFromSecondCommand(from_stream, to_stream): while True: unprocessed_line = from_stream.readline() if not unprocessed_line: from_stream.close() to_stream.close() break processed_line = a_different_python_function_that_processes_line(unprocessed_line) to_stream.write(processed_line) to_stream.flush() 

2 Solutions collect form web for “Как вставить несколько экземпляров подпроцессов в Python 2.7?”

Подражать:

 echo foo | firstCommand - | somePythonRoutine - | secondCommand - | anotherPythonRoutine - | thirdCommand - > finalOutput 

ваш текущий подход с потоками работает:

 from subprocess import Popen, PIPE first = Popen(["firstCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1) second = Popen(["secondCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1) bind(first.stdout, second.stdin, somePythonRoutine) with open("finalOutput", "wb") as file: third = Popen(["thirdCommand", "-"], stdin=PIPE, stdout=file, bufsize=1) bind(second.stdout, third.stdin, anotherPythonRoutine) # provide input for the pipeline first.stdin.write(b"foo") first.stdin.close() # wait for it to complete pipestatus = [p.wait() for p in [first, second, third]] 

где каждый bind() запускает новый поток:

 from threading import Thread def bind(input_pipe, output_pipe, line_filter): def f(): try: for line in iter(input_pipe.readline, b''): line = line_filter(line) if line: output_pipe.write(line) # no flush unless newline present finally: try: output_pipe.close() finally: input_pipe.close() t = Thread(target=f) t.daemon = True # die if the program exits t.start() 

и somePythonRoutine , anotherPythonRoutine принимает одну строку и возвращает ее (возможно, изменен).

Точка communicate() заключается в том, что она возвращает результат процесса. Это сталкивается с настройкой вашей трубы.

Вы должны называть его только один раз на третьем процессе; все остальные подключены через каналы и знают, как общаться друг с другом – никаких других / ручных вмешательств не требуется.

  • Реализация таймера (обратного отсчета), который запускает pararell с логикой игры
  • Регрессия с переменной Date с использованием Scikit-learn
  • Отображение уведомлений в Gnome Shell
  • pip install дает ошибку: не удается найти vcvarsall.bat
  • Терминал Pycharm не изменяет версию Python, соответствующую версии Python, в Project Interpreter
  • Получение TypeError во время обучения начальной модели с нуля на пользовательском наборе данных
  • const void * указатель в ctypes
  • Преобразуйте массив numpy в строку CSV и строку CSV обратно в массив numpy
  • Python - лучший язык программирования в мире.