Как читать веб-камеру в отдельном процессе на OSX?

Я читаю веб-камеру на OSX, которая отлично работает с этим простым скриптом:

import cv2 camera = cv2.VideoCapture(0) while True: try: (grabbed, frame) = camera.read() # grab the current frame frame = cv2.resize(frame, (640, 480)) # resize the frame cv2.imshow("Frame", frame) # show the frame to our screen cv2.waitKey(1) # Display it at least one ms before going to the next frame except KeyboardInterrupt: # cleanup the camera and close any open windows camera.release() cv2.destroyAllWindows() print "\n\nBye bye\n" break 

Теперь я хочу прочитать видео в отдельном процессе, для которого у меня есть сценарий, который намного длиннее и который правильно считывает видео в отдельном процессе в Linux:

 import numpy as np import time import ctypes import argparse from multiprocessing import Array, Value, Process import cv2 class VideoCapture: """ Class that handles video capture from device or video file """ def __init__(self, device=0, delay=0.): """ :param device: device index or video filename :param delay: delay between frame captures in seconds(floating point is allowed) """ self._cap = cv2.VideoCapture(device) self._delay = delay def _proper_frame(self, delay=None): """ :param delay: delay between frames capture(in seconds) :param finished: synchronized wrapper for int(see multiprocessing.Value) :return: frame """ snapshot = None correct_img = False fail_counter = -1 while not correct_img: # Capture the frame correct_img, snapshot = self._cap.read() fail_counter += 1 # Raise exception if there's no output from the device if fail_counter > 10: raise Exception("Capture: exceeded number of tries to capture the frame.") # Delay before we get a new frame time.sleep(delay) return snapshot def get_size(self): """ :return: size of the captured image """ return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))), int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3) def get_stream_function(self): """ Returns stream_function object function """ def stream_function(image, finished): """ Function keeps capturing frames until finished = 1 :param image: shared numpy array for multiprocessing(see multiprocessing.Array) :param finished: synchronized wrapper for int(see multiprocessing.Value) :return: nothing """ # Incorrect input array if image.shape != self.get_size(): raise Exception("Capture: improper size of the input image") print("Capture: start streaming") # Capture frame until we get finished flag set to True while not finished.value: image[:, :, :] = self._proper_frame(self._delay) # Release the device self.release() return stream_function def release(self): self._cap.release() def main(): # Add program arguments parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('-output', dest="output", default="output.avi", help='name of the output video file') parser.add_argument('-log', dest="log", default="frames.log", help='name of the log file') parser.add_argument('-fps', dest="fps", default=25., help='frames per second value') # Read the arguments if any result = parser.parse_args() fps = float(result.fps) output = result.output log = result.log # Initialize VideoCapture object and auxilary objects cap = VideoCapture() shape = cap.get_size() stream = cap.get_stream_function() # Define shared variables(which are synchronised so race condition is excluded) shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2]) frame = np.ctypeslib.as_array(shared_array_base.get_obj()) frame = frame.reshape(shape[0], shape[1], shape[2]) finished = Value('i', 0) # Start processes which run in parallel video_process = Process(target=stream, args=(frame, finished)) video_process.start() # Launch capture process # Sleep for some time to allow videocapture start working first time.sleep(2) # Termination function def terminate(): print("Main: termination") finished.value = True # Wait for all processes to finish time.sleep(1) # Terminate working processes video_process.terminate() # The capturing works until keyboard interrupt is pressed. while True: try: # Display the resulting frame cv2.imshow('frame', frame) cv2.waitKey(1) # Display it at least one ms before going to the next frame time.sleep(0.1) except KeyboardInterrupt: cv2.destroyAllWindows() terminate() break if __name__ == '__main__': main() 

Это отлично работает в Linux, но на OSX у меня проблемы, потому что он не может выглядеть как .read() на созданном cv2.VideoCapture(device) (сохраненном в var self._cap ).

После некоторого поиска я нашел этот ответ SO , который предлагает использовать Billiard , замену многопроцессорности pythons, которая, предположительно, имеет некоторые очень полезные улучшения. Таким образом, в верхней части файла я просто добавил импорт после моего предыдущего многопроцессорного импорта (эффективно переопределяя multiprocessing.Process ):

 from billiard import Process, forking_enable 

и непосредственно перед созданием переменной video_process я использую forking_enable следующим образом:

 forking_enable(0) # Supposedly this is all I need for billiard to do it's magic video_process = Process(target=stream, args=(frame, finished)) 

Итак, в этой версии ( здесь на pastebin ) я снова запустил файл, который дает мне эту ошибку:

pickle.PicklingError: Невозможно рассолковать: он не найден как основной .stream_function

Поиск этой ошибки привел меня к вопросу SO с длинным списком ответов, в котором мне было предложено использовать сериализацию lib для решения этой проблемы. Однако этот lib должен использоваться с многопроцессорной вилкой Pathos . Поэтому я просто попытался изменить свою линию импорта многопроцессорности

 from multiprocessing import Array, Value, Process 

в

 from pathos.multiprocessing import Array, Value, Process 

Но ни один из Array , Value and Process похоже, не существует в пакете pathos.multiprocessing .

И с этого момента я полностью потерялся. Я ищу вещи, о которых у меня едва хватает знаний, и я даже не знаю, в каком направлении мне нужно искать или отлаживать больше.

Так может ли какая-нибудь более яркая душа, чем я, помогать мне снимать видео в отдельном процессе? Все советы приветствуются!

2 Solutions collect form web for “Как читать веб-камеру в отдельном процессе на OSX?”

Ваша первая проблема заключалась в том, что вы не могли получить доступ к веб-камере в forked процессе. Возникает несколько проблем, когда внешние библиотеки используются с fork поскольку операция fork не очищает все дескрипторы файлов, открытые родительским процессом, что приводит к странному поведению. Библиотека часто более надежна для такого рода проблем в Linux, но не рекомендуется делиться объектом IO, таким как cv2.VideoCapture между двумя процессами.

Когда вы используете billard.forking_enabled и устанавливаете его в False , вы просите библиотеку не использовать fork для forkserver нового процесса, но spawn или forkserver методов, которые являются более чистыми, поскольку они закрывают все дескрипторы файлов, но также медленнее запускать. Это не должно быть проблемой в вашем случае. Если вы используете python3.4+ , вы можете сделать это, например, с помощью multiprocessing.set_start_method('forkserver') .

Когда вы используете один из этих методов, целевая функция и аргументы должны быть сериализованы для передачи дочернему процессу. Сериализация выполняется по умолчанию с использованием pickle , у которого есть несколько потоков, как вы упомянули, не в состоянии сериализовать локально определенные объекты, а также cv2.VideoCapture . Но вы можете упростить свою программу, чтобы сделать все аргументы для вашего Process разборчивыми. Ниже приведено предварительное решение вашей проблемы:

 import numpy as np import time import ctypes from multiprocessing import set_start_method from multiprocessing import Process, Array, Value import cv2 class VideoCapture: """ Class that handles video capture from device or video file """ def __init__(self, device=0, delay=0.): """ :param device: device index or video filename :param delay: delay between frame captures in seconds(float allowed) """ self._delay = delay self._device = device self._cap = cv2.VideoCapture(device) assert self._cap.isOpened() def __getstate__(self): self._cap.release() return (self._delay, self._device) def __setstate__(self, state): self._delay, self._device = state self._cap = cv2.VideoCapture(self._device) assert self._cap.grab(), "The child could not grab the video capture" def _proper_frame(self, delay=None): """ :param delay: delay between frames capture(in seconds) :param finished: synchronized wrapper for int :return: frame """ snapshot = None correct_img = False fail_counter = -1 while not correct_img: # Capture the frame correct_img, snapshot = self._cap.read() fail_counter += 1 # Raise exception if there's no output from the device if fail_counter > 10: raise Exception("Capture: exceeded number of tries to capture " "the frame.") # Delay before we get a new frame time.sleep(delay) return snapshot def get_size(self): """ :return: size of the captured image """ return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))), int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3) def release(self): self._cap.release() def stream(capturer, image, finished): """ Function keeps capturing frames until finished = 1 :param image: shared numpy array for multiprocessing :param finished: synchronized wrapper for int :return: nothing """ shape = capturer.get_size() # Define shared variables frame = np.ctypeslib.as_array(image.get_obj()) frame = frame.reshape(shape[0], shape[1], shape[2]) # Incorrect input array if frame.shape != capturer.get_size(): raise Exception("Capture: improper size of the input image") print("Capture: start streaming") # Capture frame until we get finished flag set to True while not finished.value: frame[:, :, :] = capturer._proper_frame(capturer._delay) # Release the device capturer.release() def main(): # Initialize VideoCapture object and auxilary objects cap = VideoCapture() shape = cap.get_size() # Define shared variables shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2]) frame = np.ctypeslib.as_array(shared_array_base.get_obj()) frame = frame.reshape(shape[0], shape[1], shape[2]) finished = Value('i', 0) # Start processes which run in parallel video_process = Process(target=stream, args=(cap, shared_array_base, finished)) video_process.start() # Launch capture process # Sleep for some time to allow videocapture start working first time.sleep(2) # Termination function def terminate(): print("Main: termination") finished.value = True # Wait for all processes to finish time.sleep(1) # Terminate working processes video_process.join() # The capturing works until keyboard interrupt is pressed. while True: try: # Display the resulting frame cv2.imshow('frame', frame) # Display it at least one ms before going to the next frame time.sleep(0.1) cv2.waitKey(1) except KeyboardInterrupt: cv2.destroyAllWindows() terminate() break if __name__ == '__main__': set_start_method("spawn") main() 

Я не мог проверить его на mac в данный момент, поэтому он может не работать из коробки, но не должно быть multiprocessing связанных ошибок. Некоторые примечания:

  • Я cv2.VideoCapture экземпляр объекта cv2.VideoCapture в новом дочернем cv2.VideoCapture и захватываю камеру, поскольку только один процесс должен читать с камеры.
  • Возможно, проблема в вашей первой программе с fork происходит только из-за совместного использования cv2.VideoCapture и воссоздания ее в stream функции поможет решить вашу проблему.
  • Вы не можете передать оболочку numpy дочернему mp.Array поскольку он не будет использовать буфер mp.Array (это действительно странно, и мне понадобилось некоторое время, чтобы понять). Вам необходимо явно передать Array и создать оболочку.
  • Возможно, проблема в вашей первой программе с fork происходит только из-за совместного использования cv2.VideoCapture и воссоздания ее в stream функции поможет решить вашу проблему.

  • Я предположил, что вы запускаете свой код в python3.4+ поэтому я не использовал billard но использование forking_enabled(False) вместо set_start_method должно быть похоже.

Дайте мне знать, если эта работа!

Основная задача multiprocessing – понимание модели памяти в случае разделенных адресных пространств памяти.

Python делает вещи еще более запутанными, поскольку он абстрагирует многие из этих аспектов, скрывая несколько механизмов под несколькими невинными API.

Когда вы пишете эту логику:

 # Initialize VideoCapture object and auxilary objects cap = VideoCapture() shape = cap.get_size() stream = cap.get_stream_function() ... # Start processes which run in parallel video_process = Process(target=stream, args=(frame, finished)) video_process.start() # Launch capture process 

Вы переходите к Process stream_function который относится к внутренним компонентам класса self.get_size ( self.get_size ), но, что более важно, который недоступен в качестве функции верхнего уровня .

Детский процесс не сможет перестроить требуемый объект как то, что он получает, это просто имя функции. Он пытается найти его в главном модуле, следовательно, сообщение:

pickle.PicklingError: Невозможно рассолковать: он не найден как main.stream_function

Детский процесс пытается разрешить функцию в основном модуле как main.stream_function и поиск не выполняется.

Мое первое предложение состояло в том, чтобы изменить вашу логику, чтобы вы передавали дочернему процессу метод, возвращающий stream_function .

 video_process = Process(target=cap.get_stream_function, args=(...)) 

Однако вы можете столкнуться с проблемами, поскольку вы разделяете состояние между двумя процессами.

То, что я обычно предлагаю людям, когда они подходят к многопроцессорным парадигмам в Python, – это думать о процессах, как если бы они работали в отдельных машинах. В этих случаях было бы очевидно, что ваша архитектура является проблематичной.

Я бы рекомендовал вам отделить обязанности двух процессов, чтобы один процесс (ребенок) имел дело со всем захватом видео, а другой (родительский или третий процесс) имеет дело с обработкой кадров.

Эта парадигма известна как проблема производителя и потребителя, и она очень хорошо подходит для вашей системы. Процесс захвата видео будет производителем, а другой – потребителем. Простая multiprocessing.Pipe multiprocessing.Queue multiprocessing.Pipe или multiprocessing.Queue будет гарантировать, что кадры будут переданы от производителя потребителю, как только они будут готовы.

Добавление примера в псевдокод, поскольку я не знаю API для захвата видео. Речь идет о всей логике захвата видео в процессе производителя, абстрагируя ее от потребителя. Только то, что потребитель должен знать, это то, что он получает объект кадра через трубу.

 def capture_video(writer): """This runs in the producer process.""" # The VideoCapture class wraps the video acquisition logic cap = VideoCapture() while True: frame = cap.get_next_frame() # the method returns the next frame writer.send(frame) # send the new frame to the consumer process def main(): reader, writer = multiprocessing.Pipe(False) # producer process video_process = Process(target=capture_video, args=[writer]) video_process.start() # Launch capture process while True: try: frame = reader.recv() # receive next frame from the producer process_frame(frame) except KeyboardInterrupt: video_process.terminate() break 

Обратите внимание на то, что между процессами нет общего состояния (нет необходимости использовать какой-либо массив). Связь проходит через Pipes и является однонаправленной, что делает логику очень простой. Как я сказал выше, эта логика будет работать и на разных машинах. Вам просто нужно заменить трубку розеткой.

Возможно, вам понадобится более простой подход к завершению процесса производителя. Я бы предложил вам использовать multiprocessing.Event . Просто установите его из родителя в блоке KeyboardInterrupt и проверьте его статус в дочернем while not event.is_set() на каждой итерации ( while not event.is_set() ).

  • gevent не удается установить в виртуальной среде python на OS X Capitan
  • Состояние выхода Xcode gcc 1
  • Когда модуль python (cairo) установлен успешно, но не может импортироваться в python, в чем проблема?
  • OpenCV установил, но не связал сообщение об ошибке на OS-X Mavericks
  • Как избежать повторного входа в мои аккаунты каждый раз, Selenium Python mac
  • Python Selenium chromedriver OSError: Ошибка формата Exec
  • libsvm на PyBrain на OSX для SVM
  • ImportError: Нет модуля с именем pandas. Pandas установил пип
  • Python - лучший язык программирования в мире.