Kafka потоковое событийное искусственное интеллекта и автоматизации
Кафка потоковое событийное искусственного интеллекта и автоматизации
Apache Kafka выступает в роли явного лидера в корпоративной архитектуре для перехода от данных в покое (транзакций в базе данных) к потоковому анализу. Существует множество презентаций, которые объясняют, как работает Kafka и как масштабировать этот стек технологий (либо в локальной сети, либо в облаке). Следующим этапом этого проекта является создание микросервиса с использованием ChatGPT для потребления сообщений, их обогащения, преобразования и сохранения. В этом примере мы будем потреблять входные данные от устройства IoT (Raspberry Pi), которое отправляет JSON-показания температуры каждые несколько секунд.
Потребление сообщения
Как только каждое сообщение события Kafka производится (и входит в журнал), потребительский микросервис Kafka готов обработать каждое сообщение. Я попросил ChatGPT сгенерировать некоторый код на Python, и он дал мне основы для опроса и чтения из названной “темы”. Я получил довольно хороший старт для потребления темы, ключа и JSON-данных. Сгенерированный ChatGPT код позволяет сохранять эти данные в базе данных с использованием SQLAlchemy. Затем мне необходимо было преобразовать JSON-данные и использовать правила API Logic Server (ALS – открытый проект на GitHub) для распаковки JSON, валидации, вычисления и создания нового набора сообщений на основе исходной температуры вне заданного диапазона.
Примечание: ChatGPT выбрал библиотеки Confluent Kafka (и использует их контейнер Docker Kafka) – вы можете изменить свой код для использования других библиотек Kafka на Python.
Модель SQLAlchemy
С использованием API Logic Server (ALS: платформа на языке Python с открытым исходным кодом), мы подключаемся к базе данных MySQL. ALS будет читать таблицы и создавать модель ORM SQLAlchemy, пользовательский интерфейс react-admin, safrs-JSON Open API (Swagger) и работающий REST веб-сервис для каждой конечной точки ORM. В новой таблице Temperature будут содержаться отметка времени, идентификатор устройства IoT и показания температуры. Здесь мы используем командную строку ALS для создания модели ORM:
- Создайте конвейер RAG с помощью индекса LLama
- Несколько захватывающих техник быстрой разработки для повышения эффективности наших моделей LLM.
- Одна остановка для кластеризации методом K-средних
Сгенерированный класс API Logic Server, используемый для хранения наших значений Temperature
.
Изменения
Таким образом, вместо повторного сохранения JSON-сообщения потребителя Kafka в базе данных SQL (и запуска правил для выполнения работы), мы распаковываем JSON-данные (util.row_to_entity
) и вставляем их в таблицу Temperature, вместо сохранения JSON-сообщения. Мы позволяем декларативным правилам обрабатывать каждое показание температуры.
Когда потребитель получает сообщение, оно будет добавлено в сеанс, что вызовет правило commit_event
(ниже).
Декларативная логика: Создание сообщения
С использованием API Logic Server (фреймворк для автоматизации, построенный с использованием SQLAlchemy, Flask и правилного движка LogicBank в виде электронной таблицы: формула, сумма, подсчет, копирование, ограничение, событие и т.д.), мы добавляем декларативное правило commit_event
на ORM-сущность Temperature
. После сохранения каждого сообщения в таблице Temperature вызывается правило commit_event
. Если показание температуры превышает MAX_TEMP
или меньше MIN_TEMP
, мы отправим сообщение Kafka на тему “TempRangeAlert”
. Мы также добавляем ограничение, чтобы убедиться, что получаем данные в нормальном диапазоне (32
–132
). Мы позволим другому потребителю событий обрабатывать сообщение об оповещении.
Отправляем сообщение с предупреждением только в том случае, если показание температуры больше MAX_TEMP
или меньше MIN_TEMP
. Ограничение проверит диапазон температуры перед вызовом события о сохранении (обратите внимание, что порядок правил всегда неопределен и может быть изменен в соответствии с изменением требований).
TDD Behave тестирование
С использованием TDD (Разработка через тестирование) мы можем написать Behave тест для прямого вставления записей в таблицу Temperature, а затем проверять возвращаемое значение KafkaMessageSent
. Behave начинается с Feature
/Scenario
(файл .feature). Для каждого сценария мы пишем соответствующий класс на Python с использованием декораторов Behave
.
Определение Feature
Класс Python для TDD
Резюме
Использование ChatGPT для генерации кода сообщения Kafka как для Consumer, так и для Producer кажется хорошей отправной точкой. Установите Confluent Docker для Kafka. Использование API Logic Server для декларативных правил логики позволяет нам добавлять формулы, ограничения и события в обычный поток транзакций в нашу SQL базу данных, а также создавать (и преобразовывать) новые сообщения Kafka, что является отличным сочетанием. ChatGPT и декларативная логика – это следующий уровень “парного программирования”.