Быстрый шаблон FastAPI для LLM SaaS Часть 2 — Celery и Pg-vector
Быстрый шаблон FastAPI для LLM SaaS Часть 2 — Celery и Pg-vector (описание и применение)
Этот блоговый пост является частью шаблона FastAPI + Supabase для сервиса LLM SaaS, основанного на концепциях, представленных в части 1 (Auth and File Upload).
FastAPI Template для LLM SaaS Часть 1 — Auth and File Upload
Возрастающая популярность FastAPI среди разработчиков Python подчеркивается его простотой и поддержкой местного интерфейса Swagger UI…
pub.towardsai.net
Большинство примеров кода используются с ссылкой на Quivr.
- «ИИ – это шестая дисциплина искусственного интеллекта»
- DIRFA превращает аудиофрагменты в реалистичные цифровые лица
- Обучающий курс RLHF для LLM с использованием Huggingface 🤗
Celery Worker и Очередь Сообщений для долгосрочных процессов
На иллюстрации ниже показано, как Celery выполняет работу в экосистеме FastAPI совместно с очередью сообщений. Процесс начинается с отправки задач FastAPI в назначенный брокер (в данном случае Redis). Затем Celery-рабочие извлекают и обрабатывают эти задачи в распределенной очереди задач, сохраняя результаты в бэкенде результатов (также Redis). В то же время FastAPI может контролировать состояние задачи и результаты. В примере используется один экземпляр Redis для брокера и бэкенда результатов, но если это необходимо, можно использовать отдельные экземпляры.
![Источник: скетч автора](https://ai.miximages.com/miro.medium.com/v2/resize:fit:640/format:webp/0*DHowAiR1dMhpyqWi.png)
Для начала разработки необходимо запустить экземпляр Redis с помощью следующих команд Docker:
# Получение последнего образа Redisdocker pull redis:latest# Запуск экземпляра Redisdocker run --name redis -d -p 6379:6379 redis:latest
Настройте переменные среды для брокера и бэкенда результатов в вашем проекте FastAPI:
# Экземпляр брокера - RedisCELERY_BROKER_URL=redis://localhost:6379/0# Бэкенд результатов - RedisCELERY_RESULT_BACKEND=redis://localhost:6379/0
Создайте тестовую задачу в main.py
:
from celery import Celeryimport time
celery = Celery( __name__, broker=os.getenv("CELERY_BROKER_URL"), backend=os.getenv("CELERY_RESULT_BACKEND"),)@Celery.taskdef test(): import time time.sleep(5) return "Привет, мне нравится есть сельдерей!"
Вернитесь в терминал и введите команду для запуска celery (предполагается, что у вас уже установлен Celery в среде. Если нет, просто используйте pip install)
celery --app=main.celery worker --concurrency=1 --loglevel=DEBUG
Примечание: если вы тестируете скрипт на компьютере с Windows, вам может потребоваться добавить «-P solo» в команду, чтобы он работал в локальной среде. Для работы в режиме продакшн это не требуется.
Вы увидите что-то вроде этого:
-------------- celery@xxxx v5.2.7 (dawn-chorus)--- ***** ----- -- ******* ---- Windows-10-10.0.22621-SP0 2023-11-20 07:03:38- *** --- * --- - ** ---------- [config]- ** ---------- .> app: main:0x22d23e16d70- ** ---------- .> transport: redis://localhost:6379/0- ** ---------- .> results: redis://localhost:6379/0- *** --- * --- .> concurrency: 4 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
Теперь вы можете открыть другой терминал на основе рабочего каталога и использовать Python REPL для быстрого тестирования.
(.venv) PS C:\Users\xxx\backend> pythonPython 3.10.11 (tags/v3.10.11:7d4cc5a, Apr 5 2023, 00:38:17) [MSC v.1929 64 bit (AMD64)] on win32Type "help", "copyright", "credits" or "license" for more information.>>> from main import app, celery, test>>> test.delay()<AsyncResult: 2ba428c1-5d82-4f37-aa89-5cef76b7a6eb>
Вернитесь на другой терминал, где запущен рабочий процесс Celery. Вы должны наблюдать выполнение задачи в консоли рабочего процесса Celery.
[2023-11-20 12:21:46,743: INFO/MainProcess] Задача main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] получена[2023-11-20 12:21:51,754: INFO/MainProcess] Задача main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] выполнена за 5.014999999999418 секунд: 'Привет, мне нравится есть сельдерей!'
Загрузка файла и векторное хранилище (плагин pg-vector)
На основе теста Celery, фактический сценарий использования включает запуск фоновой задачи Celery для встраивания PDF-документа и сохранения его в векторное хранилище. Процесс включает загрузку файла в хранилище Supabase и запуск задачи Celery для его загрузки и обработки векторного хранилища.
![Источник: диаграмма автора](https://ai.miximages.com/miro.medium.com/v2/resize:fit:640/format:webp/0*Fr8zI9IYLDrnD3rU.png)
Весь процесс немного сложен. Сначала файл будет загружен в хранилище Supabase. Затем мы запускаем задачу Celery для загрузки этого файла и его обработки для векторного хранилища. В процессе будет использоваться загрузчик документов для преобразования из исходного формата файла в текстовый формат и разделитель текста для разделения текста на фрагменты (из-за ограничения размера отдельного вектора в векторном хранилище). Также мы добавим метаданные для каждого конкретного фрагмента текста. Затем, наконец, фрагменты текста будут встраиваться в векторы и загружаться в векторное хранилище Supabase (плагин pg-vector для postgres).
Таблицы SQL в Supabase
Прежде всего, убедитесь, что на Supabase созданы две таблицы для этой демонстрации: (дополнительные примеры SQL-скриптов вы можете найти по ссылке https://github.com/StanGirard/quivr/tree/main/scripts)
-- Создание таблицы пользователей X векторовCREATE TABLE IF NOT EXISTS user_vectors ( user_id UUID, vector_id UUID, PRIMARY KEY (user_id, vector_id), FOREIGN KEY (vector_id) REFERENCES vectors (id), FOREIGN KEY (user_id) REFERENCES auth.users (id));-- Создание расширения векторовCREATE EXTENSION IF NOT EXISTS vector;-- Создание таблицы векторовCREATE TABLE IF NOT EXISTS vectors ( id UUID DEFAULT uuid_generate_v4() PRIMARY KEY, content TEXT, metadata JSONB, embedding VECTOR(1536));
Определение маршрутов и конечных точек
В файле main.py добавьте новый маршрутизатор ‘upload_router’.
from routes.upload_routes import upload_routerapp.include_router(upload_router)
Создайте новый каталог с именем ‘routes’ и создайте файл с именем ‘upload_routes.py’
from fastapi.responses import JSONResponsefrom auth import AuthBearer, get_current_userfrom celery_worker import process_filefrom celery.result import AsyncResultfrom fastapi import APIRouter, Depends, HTTPException, Request, UploadFilefrom repository.files.upload_file import upload_file_storagefrom logger import get_loggerfrom models import UserIdentitylogger = get_logger(__name__)upload_router = APIRouter()@upload_router.get("/upload/healthz", tags=["Здоровье"])async def healthz(): return {"status": "ok"}@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Загрузка"])async def upload_file( request: Request, uploadFile: UploadFile, current_user: UserIdentity = Depends(get_current_user),): file_content = await uploadFile.read() filename_with_user_id = str(current_user.id) + "/" + str(uploadFile.filename) logger.info(f"Имя файла: {filename_with_user_id}") try: fileInStorage = upload_file_storage(file_content, filename_with_user_id) logger.info(f"Файл {fileInStorage} успешно загружен") except Exception as e: if "Ресурс уже существует" in str(e): raise HTTPException( status_code=403, detail=f"Файл {uploadFile.filename} уже существует в хранилище.", ) else: raise HTTPException( status_code=500, detail="Не удалось загрузить файл в хранилище." ) task = process_file.delay( file_name=filename_with_user_id, file_original_name=uploadFile.filename, user_id=current_user.id, ) return JSONResponse({"task_id": task.id})@upload_router.get("/upload/{task_id}", dependencies=[Depends(AuthBearer())], tags=["Загрузка"])def get_status(task_id: str): task_result = AsyncResult(task_id) result = { "task_id": task_id, "task_status": task_result.status } return JSONResponse(result)
Этот скрипт определит два конечных точки в «upload_routes.py» для загрузки файлов и проверки состояния задачи.
![Источник: скриншот автора](https://ai.miximages.com/miro.medium.com/v2/resize:fit:640/format:webp/1*n4HbKfaNo-Mm4q0v9d0U_w.png)
Вы увидите, что есть задача Celery с названием «process_file» в /upload. Давайте теперь создадим эту задачу в celery.
Рабочий процесс Celery и задачи
Во-первых, создайте файл в основном каталоге с названием «celery_worker.py».
import osfrom celery import Celeryimport asynciofrom utils.process_file import get_supabase_client,file_handlercelery = Celery( __name__, broker="redis://127.0.0.1:6379/0", backend="redis://127.0.0.1:6379/0")@celery.task(name="process_file")def process_file( file_name: str, file_original_name: str, user_id: str,): supabase_client = get_supabase_client() tmp_file_name = "tmp-file-"+file_name tmp_file_name = tmp_file_name.replace("/", "_") with open(tmp_file_name, "wb+") as file: res = supabase_client.storage.from_("quivr").download(file_name) file.write(res) loop = asyncio.new_event_loop() message = loop.run_until_complete( file_handler( file=tmp_file_name, user_id=user_id, file_original_name=file_original_name ) ) file.close os.remove(tmp_file_name)
Эта задача «process_file» (см. диаграмму процесса выше) загружает файл, использует file_handler для обработки файла, а затем удаляет временный файл после завершения.
Обработка файлов и встраивание
Для простоты вы можете использовать следующий скрипт file_handler. В этом скрипте есть рабочий процесс, выполняющий все встроенные элементы. Также вы можете изучить базовый код Quivr, в котором есть еще одна общая задача для назначения встроенных элементов нескольким рабочим.
# utils/process_file.py для обработки загруженного файлаimport osimport timefrom logger import get_loggerfrom repository.files.upload_file import DocumentSerializablefrom langchain.document_loaders import UnstructuredPDFLoaderfrom models.databases.supabase.supabase import SupabaseDBfrom supabase.client import Client, create_clientfrom langchain.vectorstores import SupabaseVectorStorefrom langchain.embeddings.openai import OpenAIEmbeddingsfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom dotenv import load_dotenvload_dotenv()logger = get_logger(__name__)def get_supabase_client() -> Client: supabase_client: Client = create_client( os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY") ) return supabase_clientdef get_supabase_db() -> SupabaseDB: supabase_client = get_supabase_client() return SupabaseDB(supabase_client)def get_embeddings() -> OpenAIEmbeddings: embeddings = OpenAIEmbeddings( openai_api_key=os.getenv("OPENAI_API_KEY") ) # pyright: ignore reportPrivateUsage=none return embeddingsdef get_documents_vector_store() -> SupabaseVectorStore: # settings = BrainSettings() # pyright: ignore reportPrivateUsage=none embeddings = get_embeddings() supabase_client: Client = create_client( os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY") ) documents_vector_store = SupabaseVectorStore( supabase_client, embeddings, table_name="vectors" ) return documents_vector_storedef create_vector(doc): documents_vector_store = get_documents_vector_store() try: sids = documents_vector_store.add_documents([doc]) if sids and len(sids) > 0: return sids except Exception as e: logger.error(f"Error creating vector for document: {e}") def create_user_vector(user_id, vector_id): database = get_supabase_db() response = ( database.db.table("user_vectors") .insert( { "user_id": str(user_id), "vector_id": str(vector_id), } ) .execute() ) return response.data def create_embedding_for_document(user_id, doc_with_metadata): doc = DocumentSerializable.from_json(doc_with_metadata) created_vector = create_vector(doc) created_vector_id = created_vector[0] # pyright: ignore reportPrivateUsage=none create_user_vector(user_id, created_vector_id) def compute_documents_from_pdf(file,loader): loader = loader(file) documents=[] documents = loader.load() # split the documents into chunks text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=500, chunk_overlap=0 ) documents = text_splitter.split_documents(documents) return documents async def file_handler( file: str, file_original_name: str, user_id, loader_class=UnstructuredPDFLoader, #the loader class from Langchain): dateshort = time.strftime("%Y%m%d") documents = compute_documents_from_pdf(file,loader_class) for doc in documents: # pyright: ignore reportPrivateUsage=none metadata = { "file_name": file_original_name, "date": dateshort } doc_with_metadata = DocumentSerializable( page_content=doc.page_content, metadata=metadata ) create_embedding_for_document( user_id, doc_with_metadata.to_json() ) return "Планировщик обработал все файлы!"
В демонстрационных целях здесь тестируются только файлы pdf. Для более широкого спектра файловых форматов вы можете обратиться к базе кода Quivr, где используется класс File для обработки различных файловых форматов.
Тестирование от начала до конца
Чтобы протестировать это, просто включите сервер Uvicorn (для FastAPI) и сервер Celery.
uvicorn main:app --reload
celery -A celery_worker worker --loglevel=info --logfile=celery.log --concurrency=1 -P solo
— logfile (опционально): может сохранять файл журнала Celery в рабочем каталоге
— concurrency (опционально): устанавливает, сколько рабочих процессов вы хотите запустить одновременно
— P solo: мне нужно это, чтобы запустить Celery на ноутбуке с Windows. Если вы запускаете его на Mac/Docker, вам, вероятно, это не понадобится.
Вот фрагмент для тестирования точек доступа.
![Источник: скриншот автора](https://ai.miximages.com/miro.medium.com/v2/resize:fit:640/format:webp/1*5Kh6vV3_b78h23iduRytIA.png)
![Источник: скриншот автора](https://ai.miximages.com/miro.medium.com/v2/resize:fit:640/format:webp/1*W6m-2ufVRG425uHGuGy59A.png)