Эффективный поток с камеры с использованием Python
Python для эффективного потока с камеры
Давайте поговорим о том, как использовать веб-камеры с помощью Python. У меня была простая задача – считывать кадры с камеры и запускать нейронную сеть на каждом кадре. С одной конкретной веб-камерой у меня были проблемы с настройкой целевых кадров в секунду (как я теперь понимаю – потому что камера могла работать со скоростью 30 кадров в секунду с форматом mjpeg, но не сырым), поэтому я решил изучить FFmpeg, чтобы узнать, поможет ли он.
В результате я смог заставить работать и OpenCV, и FFmpeg, но обнаружил очень интересную вещь: производительность FFmpeg была превосходной по сравнению с OpenCV в моем основном случае использования. Фактически, при использовании FFmpeg я получил ускорение в 15 раз для чтения кадра и ускорение в 32% для всего конвейера. Я не мог поверить результатам и несколько раз все перепроверил, но они оказались последовательными.
Примечание: производительность была точно такой же, когда я просто считывал кадр за кадром, но FFmpeg был быстрее, когда я выполнял что-то после чтения кадра (что занимает время). Я покажу это далее.
Теперь давайте посмотрим на код. Во-первых, класс для считывания кадров веб-камеры с помощью OpenCV:
- Журнал AI Time представляет электронную книгу Тенденции искусственного интеллекта в образовании 2023 года экспертные предсказания о будущем обучения
- Советы по PyTorch, чтобы повысить вашу производительность
- Прорыв в области искусственного интеллекта от IBM перевод COBOL на Java стал проще
class VideoStreamCV: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.cap = self._open_camera() self.wait_for_cam() def _open_camera(self): cap = cv2.VideoCapture(self.src) cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0]) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1]) fourcc = cv2.VideoWriter_fourcc(*"MJPG") cap.set(cv2.CAP_PROP_FOURCC, fourcc) cap.set(cv2.CAP_PROP_FPS, self.fps) return cap def read(self): ret, frame = self.cap.read() if not ret: return None return frame def release(self): self.cap.release() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return False
Я использую функцию wait_for_cam
, так как камеры часто требуют времени для “разогрева”. Тот же самый “разогрев” используется и с классом FFmpeg:
class VideoStreamFFmpeg: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.pipe = self._open_ffmpeg() self.frame_shape = (self.resolution[1], self.resolution[0], 3) self.frame_size = np.prod(self.frame_shape) self.wait_for_cam() def _open_ffmpeg(self): os_name = platform.system() if os_name == "Darwin": # macOS input_format = "avfoundation" video_device = f"{self.src}:none" elif os_name == "Linux": input_format = "v4l2" video_device = f"{self.src}" elif os_name == "Windows": input_format = "dshow" video_device = f"video={self.src}" else: raise ValueError("Unsupported OS") command = [ 'ffmpeg', '-f', input_format, '-r', str(self.fps), '-video_size', f'{self.resolution[0]}x{self.resolution[1]}', '-i', video_device, '-vcodec', 'mjpeg', # Входной кодек установлен на mjpeg '-an', '-vcodec', 'rawvideo', # Декодирование потока MJPEG в сырое видео '-pix_fmt', 'bgr24', '-vsync', '2', '-f', 'image2pipe', '-' ] if os_name == "Linux": command.insert(2, "-input_format") command.insert(3, "mjpeg") return subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8 ) def read(self): raw_image = self.pipe.stdout.read(self.frame_size) if len(raw_image) != self.frame_size: return None image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape) return image def release(self): self.pipe.terminate() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return False
Для замера времени выполнения функции run
я использовал декоратор:
def timeit(func): def wrapper(*args, **kwargs): t0 = time.perf_counter() result = func(*args, **kwargs) t1 = time.perf_counter() print(f"Время основной функции: {round(t1-t0, 4)}с") return result return wrapper
Вместо нейронной сети я использовал простую функцию (можно было использовать time.sleep
) для выполнения тяжелой синтетической задачи. Это очень важная часть, так как без какой-либо задачи скорость чтения будет одинаковой как для OpenCV, так и для FFmpeg:
def computation_task(): for _ in range(5000000): 9999 * 9999
Теперь функция с циклом, в которой я считываю кадр, замеряю время и выполняю computation_task
:
@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool): timer = [] for _ in range(100): t0 = time.perf_counter() cam.read() timer.append(time.perf_counter() - t0) if run_task: computation_task() cam.release() return round(np.mean(timer), 4)
И, наконец, функция main
, в которой я устанавливаю несколько параметров, инициализирую 2 видеопотока с помощью OpenCV и FFmpeg, и запускаю их с и без задачи computation_task
.
def main(): fsp = 30 resolution = (1920, 1080) for run_task in [False, True]: ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution) cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution) print(f"FFMPEG, задача {run_task}:") print(f"Среднее время чтения кадра: {run(cam=ff_cam, run_task=run_task)}с\n") print(f"CV2, задача {run_task}:") print(f"Среднее время чтения кадра: {run(cam=cv_cam, run_task=run_task)}с\n")
И вот что я получаю:
FFMPEG, задача False:Время основной функции: 3.2334сСреднее время чтения кадра: 0.0323сCV2, задача False:Время основной функции: 3.3934сСреднее время чтения кадра: 0.0332сFFMPEG, задача True:Время основной функции: 4.461сСреднее время чтения кадра: 0.0014сCV2, задача True:Время основной функции: 6.6833сСреднее время чтения кадра: 0.023с
Таким образом, без синтетической задачи время чтения кадра составляет 0.0323
, 0.0332
. Но с синтетической задачей: 0.0014
и 0.023
, то есть FFmpeg значительно быстрее. Красота заключается в том, что я получил реальное ускорение в моем приложении нейронной сети, не только с помощью синтетических тестов, поэтому я решил поделиться результатами.
Вот график, который показывает, сколько времени занимает 1 итерация: чтение кадра, обработка его с помощью модели yolov8s (на CPU) и сохранение кадров с обнаруженными объектами:
Вот полный скрипт с синтетическими тестами:
import platformimport subprocessimport timefrom typing import Tupleimport cv2import numpy as npclass VideoStreamFFmpeg: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.pipe = self._open_ffmpeg() self.frame_shape = (self.resolution[1], self.resolution[0], 3) self.frame_size = np.prod(self.frame_shape) self.wait_for_cam() def _open_ffmpeg(self): os_name = platform.system() if os_name == "Darwin": # macOS input_format = "avfoundation" video_device = f"{self.src}:none" elif os_name == "Linux": input_format = "v4l2" video_device = f"{self.src}" elif os_name == "Windows": input_format = "dshow" video_device = f"video={self.src}" else: raise ValueError("Unsupported OS") command = [ 'ffmpeg', '-f', input_format, '-r', str(self.fps), '-video_size', f'{self.resolution[0]}x{self.resolution[1]}', '-i', video_device, '-vcodec', 'mjpeg', # Input codec set to mjpeg '-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video '-pix_fmt', 'bgr24', '-vsync', '2', '-f', 'image2pipe', '-' ] if os_name == "Linux": command.insert(2, "-input_format") command.insert(3, "mjpeg") return subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8 ) def read(self): raw_image = self.pipe.stdout.read(self.frame_size) if len(raw_image) != self.frame_size: return None image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape) return image def release(self): self.pipe.terminate() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return Falseclass VideoStreamCV: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.cap = self._open_camera() self.wait_for_cam() def _open_camera(self): cap = cv2.VideoCapture(self.src) cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0]) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1]) fourcc = cv2.VideoWriter_fourcc(*"MJPG") cap.set(cv2.CAP_PROP_FOURCC, fourcc) cap.set(cv2.CAP_PROP_FPS, self.fps) return cap def read(self): ret, frame = self.cap.read() if not ret: return None return frame def release(self): self.cap.release() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return Falsedef timeit(func): def wrapper(*args, **kwargs): t0 = time.perf_counter() result = func(*args, **kwargs) t1 = time.perf_counter() print(f"Время основной функции: {round(t1-t0, 4)}с") return result return wrapperdef computation_task(): for _ in range(5000000): 9999 * 9999@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool): timer = [] for _ in range(100): t0 = time.perf_counter() cam.read() timer.append(time.perf_counter() - t0) if run_task: computation_task() cam.release() return round(np.mean(timer), 4)def main(): fsp = 30 resolution = (1920, 1080) for run_task in [False, True]: ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution) cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution) print(f"FFMPEG, задача {run_task}:") print(f"Среднее время чтения кадра: {run(cam=ff
Примечание: Этот скрипт был протестирован на чипе M1 Pro от Apple. Надеюсь, это было полезно!