Эффективный поток с камеры с использованием Python

Python для эффективного потока с камеры

Фото от Rahul Chakraborty на Unsplash

Давайте поговорим о том, как использовать веб-камеры с помощью Python. У меня была простая задача – считывать кадры с камеры и запускать нейронную сеть на каждом кадре. С одной конкретной веб-камерой у меня были проблемы с настройкой целевых кадров в секунду (как я теперь понимаю – потому что камера могла работать со скоростью 30 кадров в секунду с форматом mjpeg, но не сырым), поэтому я решил изучить FFmpeg, чтобы узнать, поможет ли он.

В результате я смог заставить работать и OpenCV, и FFmpeg, но обнаружил очень интересную вещь: производительность FFmpeg была превосходной по сравнению с OpenCV в моем основном случае использования. Фактически, при использовании FFmpeg я получил ускорение в 15 раз для чтения кадра и ускорение в 32% для всего конвейера. Я не мог поверить результатам и несколько раз все перепроверил, но они оказались последовательными.

Примечание: производительность была точно такой же, когда я просто считывал кадр за кадром, но FFmpeg был быстрее, когда я выполнял что-то после чтения кадра (что занимает время). Я покажу это далее.

Теперь давайте посмотрим на код. Во-первых, класс для считывания кадров веб-камеры с помощью OpenCV:

class VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

Я использую функцию wait_for_cam, так как камеры часто требуют времени для “разогрева”. Тот же самый “разогрев” используется и с классом FFmpeg:

class VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("Unsupported OS")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # Входной кодек установлен на mjpeg            '-an', '-vcodec', 'rawvideo',  # Декодирование потока MJPEG в сырое видео            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

Для замера времени выполнения функции run я использовал декоратор:

def timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"Время основной функции: {round(t1-t0, 4)}с")        return result    return wrapper

Вместо нейронной сети я использовал простую функцию (можно было использовать time.sleep) для выполнения тяжелой синтетической задачи. Это очень важная часть, так как без какой-либо задачи скорость чтения будет одинаковой как для OpenCV, так и для FFmpeg:

def computation_task():    for _ in range(5000000):        9999 * 9999

Теперь функция с циклом, в которой я считываю кадр, замеряю время и выполняю computation_task:

@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)

И, наконец, функция main, в которой я устанавливаю несколько параметров, инициализирую 2 видеопотока с помощью OpenCV и FFmpeg, и запускаю их с и без задачи computation_task.

def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FFMPEG, задача {run_task}:")        print(f"Среднее время чтения кадра: {run(cam=ff_cam, run_task=run_task)}с\n")        print(f"CV2, задача {run_task}:")        print(f"Среднее время чтения кадра: {run(cam=cv_cam, run_task=run_task)}с\n")

И вот что я получаю:

FFMPEG, задача False:Время основной функции: 3.2334сСреднее время чтения кадра: 0.0323сCV2, задача False:Время основной функции: 3.3934сСреднее время чтения кадра: 0.0332сFFMPEG, задача True:Время основной функции: 4.461сСреднее время чтения кадра: 0.0014сCV2, задача True:Время основной функции: 6.6833сСреднее время чтения кадра: 0.023с

Таким образом, без синтетической задачи время чтения кадра составляет 0.0323, 0.0332. Но с синтетической задачей: 0.0014 и 0.023, то есть FFmpeg значительно быстрее. Красота заключается в том, что я получил реальное ускорение в моем приложении нейронной сети, не только с помощью синтетических тестов, поэтому я решил поделиться результатами.

Вот график, который показывает, сколько времени занимает 1 итерация: чтение кадра, обработка его с помощью модели yolov8s (на CPU) и сохранение кадров с обнаруженными объектами:

Вот полный скрипт с синтетическими тестами:

import platformimport subprocessimport timefrom typing import Tupleimport cv2import numpy as npclass VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("Unsupported OS")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # Input codec set to mjpeg            '-an', '-vcodec', 'rawvideo',  # Decode the MJPEG stream to raw video            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falseclass VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falsedef timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"Время основной функции: {round(t1-t0, 4)}с")        return result    return wrapperdef computation_task():    for _ in range(5000000):        9999 * 9999@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FFMPEG, задача {run_task}:")        print(f"Среднее время чтения кадра: {run(cam=ff

Примечание: Этот скрипт был протестирован на чипе M1 Pro от Apple. Надеюсь, это было полезно!