Kafka потоковое событийное искусственное интеллекта и автоматизации

Кафка потоковое событийное искусственного интеллекта и автоматизации

Apache Kafka выступает в роли явного лидера в корпоративной архитектуре для перехода от данных в покое (транзакций в базе данных) к потоковому анализу. Существует множество презентаций, которые объясняют, как работает Kafka и как масштабировать этот стек технологий (либо в локальной сети, либо в облаке). Следующим этапом этого проекта является создание микросервиса с использованием ChatGPT для потребления сообщений, их обогащения, преобразования и сохранения. В этом примере мы будем потреблять входные данные от устройства IoT (Raspberry Pi), которое отправляет JSON-показания температуры каждые несколько секунд.

Потребление сообщения

Как только каждое сообщение события Kafka производится (и входит в журнал), потребительский микросервис Kafka готов обработать каждое сообщение. Я попросил ChatGPT сгенерировать некоторый код на Python, и он дал мне основы для опроса и чтения из названной “темы”. Я получил довольно хороший старт для потребления темы, ключа и JSON-данных. Сгенерированный ChatGPT код позволяет сохранять эти данные в базе данных с использованием SQLAlchemy. Затем мне необходимо было преобразовать JSON-данные и использовать правила API Logic Server (ALS – открытый проект на GitHub) для распаковки JSON, валидации, вычисления и создания нового набора сообщений на основе исходной температуры вне заданного диапазона.

Примечание: ChatGPT выбрал библиотеки Confluent Kafka (и использует их контейнер Docker Kafka) – вы можете изменить свой код для использования других библиотек Kafka на Python. 

Модель SQLAlchemy

С использованием API Logic Server (ALS: платформа на языке Python с открытым исходным кодом), мы подключаемся к базе данных MySQL. ALS будет читать таблицы и создавать модель ORM SQLAlchemy, пользовательский интерфейс react-admin, safrs-JSON Open API (Swagger) и работающий REST веб-сервис для каждой конечной точки ORM. В новой таблице Temperature будут содержаться отметка времени, идентификатор устройства IoT и показания температуры. Здесь мы используем командную строку ALS для создания модели ORM:

Сгенерированный класс API Logic Server, используемый для хранения наших значений Temperature.

Изменения

Таким образом, вместо повторного сохранения JSON-сообщения потребителя Kafka в базе данных SQL (и запуска правил для выполнения работы), мы распаковываем JSON-данные (util.row_to_entity) и вставляем их в таблицу Temperature, вместо сохранения JSON-сообщения. Мы позволяем декларативным правилам обрабатывать каждое показание температуры.

Когда потребитель получает сообщение, оно будет добавлено в сеанс, что вызовет правило commit_event (ниже).

Декларативная логика: Создание сообщения

С использованием API Logic Server (фреймворк для автоматизации, построенный с использованием SQLAlchemy, Flask и правилного движка LogicBank в виде электронной таблицы: формула, сумма, подсчет, копирование, ограничение, событие и т.д.), мы добавляем декларативное правило commit_event на ORM-сущность Temperature. После сохранения каждого сообщения в таблице Temperature вызывается правило commit_event. Если показание температуры превышает MAX_TEMP или меньше MIN_TEMP, мы отправим сообщение Kafka на тему “TempRangeAlert”. Мы также добавляем ограничение, чтобы убедиться, что получаем данные в нормальном диапазоне (32132). Мы позволим другому потребителю событий обрабатывать сообщение об оповещении.

Отправляем сообщение с предупреждением только в том случае, если показание температуры больше MAX_TEMP или меньше MIN_TEMP. Ограничение проверит диапазон температуры перед вызовом события о сохранении (обратите внимание, что порядок правил всегда неопределен и может быть изменен в соответствии с изменением требований).

TDD Behave тестирование

С использованием TDD (Разработка через тестирование) мы можем написать Behave тест для прямого вставления записей в таблицу Temperature, а затем проверять возвращаемое значение KafkaMessageSent. Behave начинается с Feature/Scenario (файл .feature). Для каждого сценария мы пишем соответствующий класс на Python с использованием декораторов Behave.

Определение Feature

Класс Python для TDD

Резюме

Использование ChatGPT для генерации кода сообщения Kafka как для Consumer, так и для Producer кажется хорошей отправной точкой. Установите Confluent Docker для Kafka. Использование API Logic Server для декларативных правил логики позволяет нам добавлять формулы, ограничения и события в обычный поток транзакций в нашу SQL базу данных, а также создавать (и преобразовывать) новые сообщения Kafka, что является отличным сочетанием. ChatGPT и декларативная логика – это следующий уровень “парного программирования”.