Создание Slackbot в реальном времени с помощью генеративного искусственного интеллекта

Создание Slackbot в режиме реального времени с использованием генеративного искусственного интеллекта

В этой статье я покажу вам, как использовать Cloudera DataFlow, построенный на Apache NiFi, для взаимодействия в режиме реального времени с большими языковыми моделями IBM WatsonX.AI Foundation. Мы можем работать с любыми моделями Foundation, такими как Google FLAN T5 XXL или IBM Granite.

Я покажу вам, насколько легко создать поток данных в реальном времени, передавая ваши вопросы ваших приложений Slack-подобного типа и мобильным приложениям напрямую в защищенные модели WatsonX.AI, работающие в IBM Cloud. Мы будем обрабатывать всю безопасность, управление, связность и управление данными с помощью Cloudera Data Flow. В рамках принятия решений мы сможем выбирать разные модели WatsonX.AI на лету в зависимости от типа запроса. Например, если мы хотим продолжить предложение вместо ответа на вопрос, я могу выбрать разные модели. Для ответов на вопросы хорошо подходит Google FLAN T5 XXL. Если я хочу продолжить предложения, я бы использовал одну из моделей IBM Granite.

Вы заметите, насколько быстро модели WatsonX.AI возвращают нам нужные результаты. Я быстро провожу обогащение и преобразование данных, а затем отправляю их в Cloudera Apache Kafka для непрерывной аналитики и распределения на множество других приложений, систем, платформ и конечных потребителей. Мы также выводим наши ответы для исходного запроса, который может быть человеком в Slack-канале или приложении. Все это происходит в режиме реального времени, без кода, с полным управлением, связностью, управлением данными и безопасностью для любого масштаба и на любой платформе.

Сила сотрудничества IBM и Cloudera в частных, общедоступных и гибридных облачных средах для данных в реальном времени и искусственного интеллекта только начинается. Попробуйте сегодня.

Пошаговый поток в реальном времени

Сначала в Slack я пишу вопрос:

«Q: Как хорошо интегрировать Generative AI и Apache NiFi?»

Топовый поток NiFi

После ввода этого вопроса сервер Slack отправляет эти события на наш зарегистрированный сервис. Он может быть размещен где-угодно, общедоступно.

  • (Нажмите здесь для ссылки на Slack API)

Slack API

После включения ваш сервер начнет получать JSON-события для каждого сообщения в Slack. Это легко принимать и разбирать в NiFi. Cloudera DataFlow позволяет принимать безопасные HTTPS REST-вызовы в общедоступной облачной версии с легкостью, даже в режиме дизайнера.

Топовый поток NiFi 2

В первой части потока мы получаем REST JSON Post, который выглядит следующим образом.

Slackbot 1.0 (+https://api.slack.com/robots)application/jsonPOSTHTTP/1.1{  "token" : "qHvJe59yetAp1bao6wmQzH0C",  "team_id" : "T1SD6MZMF",  "context_team_id" : "T1SD6MZMF",  "context_enterprise_id" : null,  "api_app_id" : "A04U64MN9HS",  "event" : {    "type" : "message",    "subtype" : "bot_message",    "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 20

Это очень подробный JSON-файл, который мы могли бы немедленно отправить в виде необработанных данных на Apache Iceberg Open Cloud Lakehouse, тему Kafka или хранилище объектов в виде JSON-документа (Опция дополнения). Я просто собираюсь разобрать то, что мне нужно.

EvaluateJSONPath

Мы извлекаем из него идентификатор канала и обычный текст сообщения. Я хочу получать только сообщения из канала general (“C1SD6N197”). Затем я копирую тексты в поле inputs, как это требуется для Hugging Face.

Мы проверяем наши данные: если это акции или погода (будут добавлены еще), мы не вызываем LLM.

SELECT * FROM FLOWFILEWHERE upper(inputs) like '%WEATHER%'AND not upper(inputs) like '%LLM SKIPPED%'SELECT * FROM FLOWFILEWHERE upper(inputs) like '%STOCK%'AND not upper(inputs) like '%LLM SKIPPED%'SELECT * FROM FLOWFILEWHERE (upper(inputs) like 'QUESTION:%'OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%'and not upper(inputs) like '%STOCK%'

Для обработки акций:

Чтобы разобрать, какие акции нам нужны, я использую свой процессор Open NLP.

Так что вам нужно будет скачать процессор и модели экстракции сущностей.

Затем мы передаем это название компании в HTTP REST-конечную точку от AlphaVantage, которая преобразует название компании в символы акций. В бесплатных аккаунтах у вас есть всего несколько вызовов в день, поэтому если что-то пошло не так, мы обходим этот шаг и просто используем то, что вы передали.

Используя RouteOnContent, мы фильтруем сообщение об ошибке.

Затем мы используем процессор QueryRecord, чтобы преобразовать из CSV в JSON и отфильтровать.

SELECT name as companyName, symbol  FROM FLOWFILEORDER BY matchScore DESCLIMIT 1

Мы используем процессор SplitRecord, чтобы убедиться, что у нас есть только одна запись. Затем мы запускаем процессор EvaluateJsonPath, чтобы получить наши поля в качестве атрибутов.

В процессоре UpdateAttribute мы обрезаем символ акции на всякий случай.

${stockSymbol:trim()}

Затем мы передаем этот символ акции в Twelve Data через процессор InvokeHTTP, чтобы получить данные о наших акциях.

Затем мы получаем обратно много данных об акциях.

{  "meta" : {    "symbol" : "IBM",    "interval" : "1min",    "currency" : "USD",    "exchange_timezone" : "Америка/Нью-Йорк",    "exchange" : "NYSE",    "mic_code" : "XNYS",    "type" : "Обыкновенные акции"  },  "values" : [ {    "datetime" : "2023-11-15 10:37:00",    "open" : "152.07001",    "high" : "152.08000",    "low" : "151.99500",    "close" : "152.00999",    "volume" : "8525"  }, {    "datetime" : "2023-11-15 10:36:00",    "open" : "152.08501",    "high" : "152.12250",    "low" : "152.08000",    "close" : "152.08501",    "volume" : "15204"  } ...

Затем мы запускаем процессор EvaluateJSONPath, чтобы получить информацию об обмене.

Мы разделяем запись, чтобы получить только одну запись, так как это нужно только для передачи в Slack. Мы используем вызовы процессора UpdateRecord, чтобы дополнить данные об акциях другими значениями. Затем мы запускаем процессор QueryRecord для ограничения количества записей до 1 для отправки в Slack.

SELECT * FROM FLOWFILEORDER BY 'datetime' DESCLIMIT 1

Мы запускаем процессор EvaluateJsonPath, чтобы получить наиболее важные поля значений для отображения.

Затем мы запускаем процессор PutSlack с нашим сообщением.

LLM Пропущено. Ценовая стоимость акций для ${companyName} [${nlp_org_1}/${stockSymbol}] на ${date} составляет ${closeStockValue}. Дата акций ${stockdateTime}. Биржа акций ${exchange}

У нас также есть отдельный процесс, который разделяется по названию компании.

На первом шаге мы вызываем Yahoo Finance, чтобы получить заголовки RSS для этой акции.

https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}&region=US&lang=en-US

Мы используем процессор QueryRecord, чтобы преобразовать записи RSS/XML в JSON.

Затем мы запускаем процессор SplitJSON, чтобы разделить новостные элементы.

Мы запускаем процессор SplitRecord, чтобы ограничиться одной записью. Мы используем процессор EvaluateJSONPath, чтобы получить нужные поля для нашего сообщения в Slack.

Затем мы запускаем процессор UpdateRecord, чтобы завершить наш JSON.

Затем мы отправляем это сообщение в Slack.

LLM Пропущено. Информация о новостях об акциях для ${companyName} [${nlp_org_1}/${stockSymbol}] на ${date}${title} : ${description}.${guid} Дата статьи ${pubdate}

Для тех, кто выбрал погоду, мы следуем похожему пути (мы должны добавить кэширование с помощью Redis @ Aiven), как в случае со складами. Мы используем мой процессор OpenNLP для извлечения местоположений, о которых вы, возможно, хотели бы узнать погоду.

Следующий шаг – преобразование вывода процессора и создание значения для отправки нашему Геокодеру.

weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "Нью-Йорк")}

Если мы не можем найти действительное местоположение, я скажу “Нью-Йорк”. Мы можем использовать какой-то другой словарь. Я работаю над загрузкой всех местоположений и могу выполнять некоторые расширенные поиски в PostgreSQL по ним – или, возможно, с помощью OpenSearch или векторизованного хранилища данных.

Я передаю это местоположение Open Meteo, чтобы найти геокод через InvokeHTTP.

https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=ru&format=json

Затем мы извлекаем нужные нам значения из результатов.

{  "results" : [ {    "id" : 5128581,    "name" : "Нью-Йорк",    "latitude" : 40.71427,    "longitude" : -74.00597,    "elevation" : 10.0,    "feature_code" : "PPL",    "country_code" : "US",    "admin1_id" : 5128638,    "timezone" : "America/New_York",    "population" : 8175133,    "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ],    "country_id" : 6252001,    "country" : "Соединенные Штаты",    "admin1" : "Нью-Йорк"  } ],  "generationtime_ms" : 0.92196465}

Затем мы разбираем результаты, чтобы вызвать другой API для получения текущей погоды для указанной широты и долготы с помощью InvokeHTTP.

https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}

Результаты представляют собой гео-json.

{ "@context": [ "https://geojson.org/geojson-ld/geojson-context.jsonld", { "@version": "1.1", "wx": "https://api.weather.gov/ontology#", "s": "https://schema.org/", "geo": "http://www.opengis.net/ont/geosparql#", "unit": "http://codes.wmo.int/common/unit/", "@vocab": "https://api.weather.gov/ontology#", "geometry": { "@id": "s:GeoCoordinates", "@type": "geo:wktLiteral" }, "city": "s:addressLocality", "state": "s:addressRegion", "distance": { "@id": "s:Distance", "@type": "s:QuantitativeValue" }, "bearing": { "@type": "s:QuantitativeValue" }, "value": { "@id": "s:value" }, "unitCode": { "@id": "s:unitCode", "@type": "@id" }, "forecastOffice": { "@type": "@id" }, "forecastGridData": { "@type": "@id" }, "publicZone": { "@type": "@id" }, "county": { "@type": "@id" } } ], "id": "https://api.weather.gov/points/40.7143,-74.006", "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.006, 40.714300000000001 ] }, "properties": { "@id": "https://api.weather.gov/points/40.7143,-74.006", "@type": "wx:Point", "cwa": "OKX", "forecastOffice": "https://api.weather.gov/offices/OKX", "gridId": "OKX", "gridX": 33, "gridY": 35, "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast", "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly", "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35", "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations", "relativeLocation": { "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.0279259, 40.745251000000003 ] }, "properties": { "city": "Hoboken", "state": "NJ", "distance": { "unitCode": "wmoUnit:m", "value": 3906.1522008034999 }, "bearing": { "unitCode": "wmoUnit:degree_(angle)", "value": 151 } } }, "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072", "county": "https://api.weather.gov/zones/county/NYC061", "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212", "timeZone": "America/New_York", "radarStation": "KDIX" }}

Мы используем EvaluateJSONPath, чтобы получить URL прогноза.

Затем мы вызываем этот URL прогноза с помощью invokeHTTP.

Это дает больший вывод JSON, который мы будем разбирать для получения результатов, которые хотим вернуть в Slack.

Мы разбираем данные с помощью EvaluateJSONPath, чтобы получить основные поля для погоды.

Затем мы форматируем эти поля в PutSlack.

Прочитан прогноз на ${date} для ${weatherlocation} @ ${latitude},${longitude}Использован ${forecasturl} ${icon} Температура: ${temperature} ${temperatureunit} - ${temperaturetrend}Ветер ${winddirection} на ${windspeed}. ${detailedforecast}

Вывод Slack


Если у нас есть вопрос LLM, давайте убедимся, что это только одна запись.

Мы используем несколько различных моделей, доступных в IBM WatsonX.AI на IBM Cloud, чтобы быстро получить доступ к ним с помощью наших REST запросов.

Я сначала протестировал и создал запросы в лаборатории IBM Prompt, а затем скопировал исходное выражение curl оттуда.

Нажмите здесь, чтобы узнать о поддерживаемых основных моделях, доступных в IBM watsonx.ai.

ibm/mpt-7b-instruct2meta-llama/llama-2–70b-chatibm/granite-13b-chat-v1

Нам нужно отправить наш уникальный безопасный ключ в IBM, и они выдадут нам токен для использования в следующем вызове.

Мы разбираем вопрос и затем отправляем его в WatsonX через REST API.

Мы создаем запрос, который отправляем в IBM, следующим образом.

{  "model_id": "meta-llama/llama-2-70b-chat",  "input": "${inputs:urlEncode()}",  "parameters": {    "decoding_method": "greedy",    "max_new_tokens": 200,    "min_new_tokens": 50,    "stop_sequences": [],    "repetition_penalty": 1  },  "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }

Мы разбираем сгенерированный текст, который представляет собой результат работы нашей Генеративной ИИ плюс некоторую полезную метаинформацию о времени выполнения.

Результат, отправленный в Slack, выглядит следующим образом:

“You can use Apache NiFi to integrate Generative AI models in several ways:

  1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.
  2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi’s PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.
  3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.
  4. Real-time Inference: You can use NiFi’s StreamingJobs” 

После того, как Slackbot опубликовал результаты, он отправил метрики и отладочную информацию в канал chat.

Вся метаинформация размещается на другом канале Slack для мониторинга администратором.

==== NiFi to IBM WatsonX.AI LLM AnswersOn Дата: Ср, 15 ноября 2023 г. 15:43:29 GMT Создан: 2023-11-15T15:43:29.248Z  Вопрос: Q: Что такое хороший способ интеграции Генеративного ИИ и Apache NiFi?Ответ: ) Вы можете использовать Apache NiFi для интеграции моделей Генеративного ИИ несколькими способами:1. Предварительная обработка данных: Используйте NiFi для предварительной обработки данных перед подачей их в вашу модель Генеративного ИИ. Это может включать очистку данных, преобразование и инженерию признаков.2. Обучение модели: Используйте NiFi для автоматизации процесса обучения вашей модели Генеративного ИИ. Вы можете использовать процессоры PutFile и PutFile_SFTP NiFi, чтобы записать обучающие данные в файл, а затем использовать процессор, наподобие ExecuteScript, для выполнения скрипта обучения.3. Развертывание модели: После обучения вашей модели Генеративного ИИ вы можете использовать NiFi для развертывания ее. Вы можете создать поток NiFi, который принимает входные данные, передает их через модель Генеративного ИИ и затем выводит сгенерированные данные.4. Реально временный вывод: Вы можете использовать потоки StreamingJobs NiFi.”Token: 200Время запроса: 8153HTTP ID TX: 89d71099-da23-4e7e-89f9-4e8f5620c0fbСообщение от IBM: Эта модель является не IBM продуктом, управляемым лицензией сторонней организации, которая может накладывать ограничения на использование и другие обязательства. Используя эту модель, вы соглашаетесь с ее условиями, указанными по следующему URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wxIBM Сообщение ID: disclaimer_warningID модели: meta-llama/llama-2-70b-chatПричина остановки: max_tokensКоличество токенов: 38TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479bService Time: 478Request ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9cFile Name: 1a3c4386-86d2-4969-805b-37649c16addbRequest Duration: 8153Request URL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29cf-ray: 82689bfd28e48ce2-EWR=====

 

Создайте своего собственного Slackbot

Вывод в Slack

Распределение с помощью Kafka

CREATE TABLE `ssb`.`Meetups`.`watsonairesults` (  `date` VARCHAR(2147483647),  `x_global_transaction_id` VARCHAR(2147483647),  `x_request_id` VARCHAR(2147483647),  `cf_ray` VARCHAR(2147483647),  `inputs` VARCHAR(2147483647),  `created_at` VARCHAR(2147483647),  `stop_reason` VARCHAR(2147483647),  `x_correlation_id` VARCHAR(2147483647),  `x_proxy_upstream_service_time` VARCHAR(2147483647),  `message_id` VARCHAR(2147483647),  `model_id` VARCHAR(2147483647),  `invokehttp_request_duration` VARCHAR(2147483647),  `message` VARCHAR(2147483647),  `uuid` VARCHAR(2147483647),  `generated_text` VARCHAR(2147483647),  `transaction_id` VARCHAR(2147483647),  `tokencount` VARCHAR(2147483647),  `generated_token` VARCHAR(2147483647),  `ts` VARCHAR(2147483647),  `advisoryId` VARCHAR(2147483647),  `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',  WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND) WITH (  'deserialization.failure.policy' = 'ignore_and_log',  'properties.request.timeout.ms' = '120000',  'format' = 'json',  'properties.bootstrap.servers' = 'kafka:9092',  'connector' = 'kafka',  'properties.transaction.timeout.ms' = '900000',  'topic' = 'watsonxaillmanswers',  'scan.startup.mode' = 'group-offsets',  'properties.auto.offset.reset' = 'earliest',  'properties.group.id' = 'watsonxaillmconsumer')CREATE TABLE `ssb`.`Meetups`.`watsonxresults` (  `date` VARCHAR(2147483647),  `x_global_transaction_id` VARCHAR(2147483647),  `x_request_id` VARCHAR(2147483647),  `cf_ray` VARCHAR(2147483647),  `inputs` VARCHAR(2147483647),  `created_at` VARCHAR(2147483647),  `stop_reason` VARCHAR(2147483647),  `x_correlation_id` VARCHAR(2147483647),  `x_proxy_upstream_service_time` VARCHAR(2147483647),  `message_id` VARCHAR(2147483647),  `model_id` VARCHAR(2147483647),  `invokehttp_request_duration` VARCHAR(2147483647),  `message` VARCHAR(2147483647),  `uuid` VARCHAR(2147483647),  `generated_text` VARCHAR(2147483647),  `transaction_id` VARCHAR(2147483647),  `tokencount` VARCHAR(2147483647),  `generated_token` VARCHAR(2147483647),  `ts` VARCHAR(2147483647),  `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',  WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND) WITH (  'deserialization.failure.policy' = 'ignore_and_log',  'properties.request.timeout.ms' = '120000',  'format' = 'json',  'properties.bootstrap.servers' = 'kafka:9092',  'connector' = 'kafka',  'properties.transaction.timeout.ms' = '900000',  'topic' = 'watsonxaillm',  'scan.startup.mode' = 'group-offsets',  'properties.auto.offset.reset' = 'earliest',  'properties.group.id' = 'allwatsonx1')

Пример запроса

{"inputs":"Пожалуйста, ответьте на следующий вопрос. Какая столица Соединенных Штатов?"}

IBM DB2 SQL

alter table  "DB2INST1"."TRAVELADVISORY"add column "summary" VARCHAR(2048);-- DB2INST1.TRAVELADVISORY definitionCREATE TABLE "DB2INST1"."TRAVELADVISORY"  (    "TITLE" VARCHAR(250 OCTETS) ,     "PUBDATE" VARCHAR(250 OCTETS) ,     "LINK" VARCHAR(250 OCTETS) ,     "GUID" VARCHAR(250 OCTETS) ,     "ADVISORYID" VARCHAR(250 OCTETS) ,     "DOMAIN" VARCHAR(250 OCTETS) ,     "CATEGORY" VARCHAR(4096 OCTETS) ,     "DESCRIPTION" VARCHAR(4096 OCTETS) ,     "UUID" VARCHAR(250 OCTETS) NOT NULL ,     "TS" BIGINT NOT NULL ,     "summary" VARCHAR(2048 OCTETS) )      IN "IBMDB2SAMPLEREL"     ORGANIZE BY ROW;ALTER TABLE "DB2INST1"."TRAVELADVISORY"  ADD PRIMARY KEY  ("UUID") ENFORCED;GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1";GRANT CONTROL ON INDEX "SYSIBM  "."SQL230620142604860" TO USER "DB2INST1";SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE  FROM DB2INST1.TRAVELADVISORY t WHERE "summary" IS NOT NULL ORDER BY ts DESC 

Пример электронной почты вывода

Видео

Исходный код