Построение потокового конвейера данных для трансляции Формулы-1 с помощью Kafka и Risingwave

Построение конвейера для трансляции Формулы-1 с помощью Kafka и Risingwave

 

Реальные данные поступили и остаются с нами. Нет сомнений в том, что каждый день количество потоковых данных экспоненциально увеличивается, и нам необходимо найти лучший способ их извлечения, обработки и визуализации. Например, каждый автомобиль Формулы-1 производит примерно 1,5 терабайта данных в течение выходных (источник).

В этой статье мы не будем передавать данные автомобиля, но мы будем передавать, обрабатывать и визуализировать данные гонки, имитируя прямую трансляцию гонки Формулы-1. Прежде чем мы начнем, важно отметить, что эта статья не будет сосредоточена на том, что такое каждая технология, а на том, как их реализовать в потоковом процессе данных, поэтому ожидается некоторое знание Python, Kafka, SQL и визуализации данных.

 

Предварительные требования

 

  • Исходные данные F1: Данные Формулы-1, используемые в этом потоковом процессе данных, были загружены с Kaggle и могут быть найдены в разделе Чемпионат мира Формулы-1 (1950 – 2023).
  • Python: В этом процессе данных использовался Python 3.9, но должна подходить любая версия выше 3.0. Дополнительные сведения о том, как скачать и установить Python, можно найти на официальном веб-сайте Python.
  • Kafka: Kafka является одной из основных используемых технологий в этом потоковом процессе данных, поэтому важно установить ее перед началом работы. Этот потоковый процесс данных был разработан на MacOS, поэтому для установки Kafka использовался brew. Дополнительные сведения можно найти на официальном веб-сайте brew. Нам также понадобится библиотека Python для работы с Kafka. В этом процессе данных используется kafka-python. Информацию о установке можно найти на их официальном веб-сайте.
  • RisingWave (потоковая база данных): На рынке существует несколько потоковых баз данных, но одной из лучших и используемых в этой статье является RisingWave. Начать работу с RisingWave очень просто, и на это требуется всего несколько минут. Полное руководство по началу работы можно найти на их официальном веб-сайте.
  • Панель инструментов Grafana: В этом потоковом процессе данных использовалась панель инструментов Grafana для визуализации данных Формулы-1 в реальном времени. Сведения о начале работы можно найти на этом веб-сайте.

 

Трансляция исходных данных

 

Теперь, когда у нас есть все предварительные требования, пришло время начать создавать потоковой процесс данных для данных Формулы-1. Исходные данные хранятся в файле JSON, поэтому нам нужно извлечь их и отправить через тему Kafka. Для этого мы будем использовать следующий сценарий на Python.

Код автора

 

Настройка Kafka

 

Скрипт на Python для потоковой передачи данных готов начать передачу данных, но тема Kafka F1Topic еще не создана, поэтому давайте создадим ее. Сначала нам нужно инициализировать Kafka. Для этого мы должны запустить Zookeper, затем запустить Kafka и, наконец, создать тему с помощью следующих команд. Помните, что Zookeper и Kafka должны работать в отдельном терминале.

Код автора

 

 

Настройка потоковой базы данных RisingWave

 

После установки RisingWave его очень легко запустить. Сначала нам необходимо инициализировать базу данных, а затем мы должны подключиться к ней через интерактивный терминал Postgres psql. Чтобы инициализировать потоковую базу данных RisingWave, мы должны выполнить следующую команду.

Код автора

Вышеуказанная команда запускает RisingWave в режиме песочницы, где данные временно хранятся в памяти. Сервис автоматически завершается после 30 минут бездействия, и все хранящиеся данные будут удалены при завершении. Этот метод рекомендуется только для тестов, для рабочих сред использовать RisingWave Cloud.

После запуска RisingWave на новом терминале можно подключиться к нему через интерактивный терминал Postgres с помощью следующей команды.

Код от автора

   

После установления соединения, настало время начать извлекать данные из темы Kafka. Чтобы получить потоковые данные в RisingWave, нам нужно создать источник. Этот источник установит связь между темой Kafka и RisingWave, поэтому давайте выполним следующую команду.

Код от автора

   

Если команда успешно выполнена, то мы увидим сообщение “CREATE SOURCE”, и источник будет создан. Важно отметить, что после создания источника данные не будут автоматически загружены в RisingWave. Нам нужно создать материализованное представление, чтобы начать перемещение данных. Это материализованное представление также поможет нам создать панель управления Grafana на следующем шаге.

Давайте создадим материализованное представление с той же схемой, что и исходные данные, с помощью следующей команды.

Код от автора

   

Если команда успешно выполнена, то мы увидим сообщение “CREATE MATERIALIZED_VIEW”, и материализованное представление будет создано, и теперь настало время его протестировать!

Запустите скрипт на Python, чтобы начать передачу данных в реальном времени, и в терминале RisingWave запросите данные в реальном времени. RisingWave – это совместимая с PostgreSQL база данных SQL, поэтому, если вы знакомы с PostgreSQL или любой другой SQL базой данных, все будет гладко для выполнения запросов к вашим потоковым данным.

   

Как видите, потоковый конвейер теперь работает, но мы не используем все преимущества потоковой базы данных RisingWave. Мы можем добавить больше таблиц для объединения данных в реальном времени и создания полнофункционального приложения.

Давайте создадим таблицу “races”, чтобы мы могли объединить потоковые данные с таблицей “race” и получить фактическое имя гонки вместо идентификатора гонки.

Код от автора

   

Теперь давайте вставим данные для определенного идентификатора гонки, который нам нужен.

Код от автора

   

Давайте выполним ту же процедуру, но с таблицей “driver”.

Код от автора

   

И, наконец, давайте вставим данные водителей.

Код от автора

У нас есть готовые таблицы для объединения потоковых данных, но нам нужно материализованное представление, где произойдет вся магия. Давайте создадим материализованное представление, где мы сможем видеть топ-3 позиции в реальном времени, объединяя идентификатор водителя и идентификатор гонки, чтобы получить фактические имена.

Код от автора

И, наконец, давайте создадим последнее материализованное представление, чтобы увидеть, сколько раз водитель занимал первую позицию во время всей гонки.

Код от автора

И теперь пришло время создать панель управления Grafana и увидеть все объединенные данные в реальном времени благодаря материализованным представлениям.

 

Настройка панели управления Grafana

 

Последний шаг в этом потоковом конвейере данных – визуализация потоковых данных в панели управления в режиме реального времени. Прежде чем создать панель управления Grafana, нам нужно создать источник данных, чтобы установить соединение между Grafana и нашей потоковой базой данных RisingWave, следуя следующим шагам.

  • Перейдите в Configuration > Data sources.
  • Нажмите кнопку Добавить источник данных.
  • Выберите PostgreSQL из списка поддерживаемых баз данных.
  • Заполните поля подключения PostgreSQL следующим образом:

   

Прокрутите вниз и нажмите кнопку сохранить и протестировать. Соединение с базой данных теперь установлено.

   

Теперь перейдите на панель приборов в левой панели, нажмите на опцию новой панели и добавьте новую панель. Выберите визуализацию таблицы, переключитесь на вкладку кода и давайте запрашивать материализованное представление live_positions, где мы можем увидеть объединенные данные для топ-3 позиций.

Код автора

   

Добавим еще одну панель для визуализации текущего круга. Выберите визуализацию манометра и во вкладке кода запросите максимальный круг, доступный в потоковых данных. Настройка манометра зависит от вас.

Код автора

   

Наконец, добавим еще одну панель для запроса материализованного представления times_in_position_one и увидим в режиме реального времени, сколько раз водитель занимал первую позицию во время всей гонки.

Код автора

 

 

Визуализация результатов

 

Наконец, все компоненты для потокового процесса данных работают. Скрипт на языке Python был выполнен для начала передачи данных через тему Kafka, потоковая база данных RisingWave считывает, обрабатывает и объединяет данные в режиме реального времени. Материализованное представление f1_lap_times читает данные из темы Kafka, и каждая панель в панели приборов Grafana является отдельным материализованным представлением, объединяющим данные в режиме реального времени, чтобы показать подробные данные благодаря объединениям, выполненным материализованными представлениями с таблицами гонок и водителей. Панель приборов Grafana запрашивает материализованные представления, и весь процесс обработки упрощается благодаря материализованным представлениям, обработанным в потоковой базе данных RisingWave.

   

  Хавьер Гранадос – старший инженер по данным, который любит читать и писать о потоковых процессах данных. Он специализируется на потоковых процессах в облачных системах, в основном на AWS, но всегда исследует новые технологии и тренды. Вы можете найти его в VoAGI по адресу https://medium.com/@JavierGr