Быстрый шаблон FastAPI для LLM SaaS Часть 2 — Celery и Pg-vector

Быстрый шаблон FastAPI для LLM SaaS Часть 2 — Celery и Pg-vector (описание и применение)

Этот блоговый пост является частью шаблона FastAPI + Supabase для сервиса LLM SaaS, основанного на концепциях, представленных в части 1 (Auth and File Upload).

FastAPI Template для LLM SaaS Часть 1 — Auth and File Upload

Возрастающая популярность FastAPI среди разработчиков Python подчеркивается его простотой и поддержкой местного интерфейса Swagger UI…

pub.towardsai.net

Большинство примеров кода используются с ссылкой на Quivr.

Celery Worker и Очередь Сообщений для долгосрочных процессов

На иллюстрации ниже показано, как Celery выполняет работу в экосистеме FastAPI совместно с очередью сообщений. Процесс начинается с отправки задач FastAPI в назначенный брокер (в данном случае Redis). Затем Celery-рабочие извлекают и обрабатывают эти задачи в распределенной очереди задач, сохраняя результаты в бэкенде результатов (также Redis). В то же время FastAPI может контролировать состояние задачи и результаты. В примере используется один экземпляр Redis для брокера и бэкенда результатов, но если это необходимо, можно использовать отдельные экземпляры.

Источник: скетч автора

Для начала разработки необходимо запустить экземпляр Redis с помощью следующих команд Docker:

# Получение последнего образа Redisdocker pull redis:latest# Запуск экземпляра Redisdocker run --name redis -d -p 6379:6379 redis:latest

Настройте переменные среды для брокера и бэкенда результатов в вашем проекте FastAPI:

# Экземпляр брокера - RedisCELERY_BROKER_URL=redis://localhost:6379/0# Бэкенд результатов - RedisCELERY_RESULT_BACKEND=redis://localhost:6379/0

Создайте тестовую задачу в main.py:

from celery import Celeryimport time

celery = Celery(    __name__,    broker=os.getenv("CELERY_BROKER_URL"),    backend=os.getenv("CELERY_RESULT_BACKEND"),)@Celery.taskdef test():    import time    time.sleep(5)    return "Привет, мне нравится есть сельдерей!"

Вернитесь в терминал и введите команду для запуска celery (предполагается, что у вас уже установлен Celery в среде. Если нет, просто используйте pip install)

celery --app=main.celery worker --concurrency=1 --loglevel=DEBUG

Примечание: если вы тестируете скрипт на компьютере с Windows, вам может потребоваться добавить «-P solo» в команду, чтобы он работал в локальной среде. Для работы в режиме продакшн это не требуется.

Вы увидите что-то вроде этого:

-------------- celery@xxxx v5.2.7 (dawn-chorus)--- ***** ----- -- ******* ---- Windows-10-10.0.22621-SP0 2023-11-20 07:03:38- *** --- * --- - ** ---------- [config]- ** ---------- .> app:         main:0x22d23e16d70- ** ---------- .> transport:   redis://localhost:6379/0- ** ---------- .> results:     redis://localhost:6379/0- *** --- * --- .> concurrency: 4 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** -----  -------------- [queues]                .> celery           exchange=celery(direct) key=celery

Теперь вы можете открыть другой терминал на основе рабочего каталога и использовать Python REPL для быстрого тестирования.

(.venv) PS C:\Users\xxx\backend> pythonPython 3.10.11 (tags/v3.10.11:7d4cc5a, Apr  5 2023, 00:38:17) [MSC v.1929 64 bit (AMD64)] on win32Type "help", "copyright", "credits" or "license" for more information.>>> from main import app, celery, test>>> test.delay()<AsyncResult: 2ba428c1-5d82-4f37-aa89-5cef76b7a6eb>

Вернитесь на другой терминал, где запущен рабочий процесс Celery. Вы должны наблюдать выполнение задачи в консоли рабочего процесса Celery.

[2023-11-20 12:21:46,743: INFO/MainProcess] Задача main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] получена[2023-11-20 12:21:51,754: INFO/MainProcess] Задача main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] выполнена за 5.014999999999418 секунд: 'Привет, мне нравится есть сельдерей!'

Загрузка файла и векторное хранилище (плагин pg-vector)

На основе теста Celery, фактический сценарий использования включает запуск фоновой задачи Celery для встраивания PDF-документа и сохранения его в векторное хранилище. Процесс включает загрузку файла в хранилище Supabase и запуск задачи Celery для его загрузки и обработки векторного хранилища.

Источник: диаграмма автора

Весь процесс немного сложен. Сначала файл будет загружен в хранилище Supabase. Затем мы запускаем задачу Celery для загрузки этого файла и его обработки для векторного хранилища. В процессе будет использоваться загрузчик документов для преобразования из исходного формата файла в текстовый формат и разделитель текста для разделения текста на фрагменты (из-за ограничения размера отдельного вектора в векторном хранилище). Также мы добавим метаданные для каждого конкретного фрагмента текста. Затем, наконец, фрагменты текста будут встраиваться в векторы и загружаться в векторное хранилище Supabase (плагин pg-vector для postgres).

Таблицы SQL в Supabase

Прежде всего, убедитесь, что на Supabase созданы две таблицы для этой демонстрации: (дополнительные примеры SQL-скриптов вы можете найти по ссылке https://github.com/StanGirard/quivr/tree/main/scripts)

-- Создание таблицы пользователей X векторовCREATE TABLE IF NOT EXISTS user_vectors (  user_id UUID,  vector_id UUID,  PRIMARY KEY (user_id, vector_id),  FOREIGN KEY (vector_id) REFERENCES vectors (id),  FOREIGN KEY (user_id) REFERENCES auth.users (id));-- Создание расширения векторовCREATE EXTENSION IF NOT EXISTS vector;-- Создание таблицы векторовCREATE TABLE IF NOT EXISTS vectors (    id UUID DEFAULT uuid_generate_v4() PRIMARY KEY,    content TEXT,    metadata JSONB,    embedding VECTOR(1536));

Определение маршрутов и конечных точек

В файле main.py добавьте новый маршрутизатор ‘upload_router’.

from routes.upload_routes import upload_routerapp.include_router(upload_router)

Создайте новый каталог с именем ‘routes’ и создайте файл с именем ‘upload_routes.py’

from fastapi.responses import JSONResponsefrom auth import AuthBearer, get_current_userfrom celery_worker import process_filefrom celery.result import AsyncResultfrom fastapi import APIRouter, Depends, HTTPException, Request, UploadFilefrom repository.files.upload_file import upload_file_storagefrom logger import get_loggerfrom models import UserIdentitylogger = get_logger(__name__)upload_router = APIRouter()@upload_router.get("/upload/healthz", tags=["Здоровье"])async def healthz():    return {"status": "ok"}@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Загрузка"])async def upload_file(    request: Request,    uploadFile: UploadFile,    current_user: UserIdentity = Depends(get_current_user),):    file_content = await uploadFile.read()    filename_with_user_id = str(current_user.id) + "/" + str(uploadFile.filename)    logger.info(f"Имя файла: {filename_with_user_id}")    try:        fileInStorage = upload_file_storage(file_content, filename_with_user_id)        logger.info(f"Файл {fileInStorage} успешно загружен")          except Exception as e:        if "Ресурс уже существует" in str(e):            raise HTTPException(                status_code=403,                detail=f"Файл {uploadFile.filename} уже существует в хранилище.",            )        else:            raise HTTPException(                status_code=500, detail="Не удалось загрузить файл в хранилище."            )    task = process_file.delay(        file_name=filename_with_user_id,        file_original_name=uploadFile.filename,        user_id=current_user.id,    )    return JSONResponse({"task_id": task.id})@upload_router.get("/upload/{task_id}", dependencies=[Depends(AuthBearer())], tags=["Загрузка"])def get_status(task_id: str):    task_result = AsyncResult(task_id)    result = {        "task_id": task_id,        "task_status": task_result.status    }    return JSONResponse(result)

Этот скрипт определит два конечных точки в «upload_routes.py» для загрузки файлов и проверки состояния задачи.

Источник: скриншот автора

Вы увидите, что есть задача Celery с названием «process_file» в /upload. Давайте теперь создадим эту задачу в celery.

Рабочий процесс Celery и задачи

Во-первых, создайте файл в основном каталоге с названием «celery_worker.py».

import osfrom celery import Celeryimport asynciofrom utils.process_file import get_supabase_client,file_handlercelery = Celery(    __name__,    broker="redis://127.0.0.1:6379/0",    backend="redis://127.0.0.1:6379/0")@celery.task(name="process_file")def process_file(    file_name: str,    file_original_name: str,    user_id: str,):    supabase_client = get_supabase_client()    tmp_file_name = "tmp-file-"+file_name    tmp_file_name = tmp_file_name.replace("/", "_")        with open(tmp_file_name, "wb+") as file:        res = supabase_client.storage.from_("quivr").download(file_name)        file.write(res)        loop = asyncio.new_event_loop()        message = loop.run_until_complete(            file_handler(                file=tmp_file_name,                user_id=user_id,                file_original_name=file_original_name            )        )                file.close    os.remove(tmp_file_name)

Эта задача «process_file» (см. диаграмму процесса выше) загружает файл, использует file_handler для обработки файла, а затем удаляет временный файл после завершения.

Обработка файлов и встраивание

Для простоты вы можете использовать следующий скрипт file_handler. В этом скрипте есть рабочий процесс, выполняющий все встроенные элементы. Также вы можете изучить базовый код Quivr, в котором есть еще одна общая задача для назначения встроенных элементов нескольким рабочим.

# utils/process_file.py для обработки загруженного файлаimport osimport timefrom logger import get_loggerfrom repository.files.upload_file import DocumentSerializablefrom langchain.document_loaders import UnstructuredPDFLoaderfrom models.databases.supabase.supabase import SupabaseDBfrom supabase.client import Client, create_clientfrom langchain.vectorstores import SupabaseVectorStorefrom langchain.embeddings.openai import OpenAIEmbeddingsfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom dotenv import load_dotenvload_dotenv()logger = get_logger(__name__)def get_supabase_client() -> Client:    supabase_client: Client = create_client(        os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY")    )    return supabase_clientdef get_supabase_db() -> SupabaseDB:    supabase_client = get_supabase_client()    return SupabaseDB(supabase_client)def get_embeddings() -> OpenAIEmbeddings:    embeddings = OpenAIEmbeddings(        openai_api_key=os.getenv("OPENAI_API_KEY")    )  # pyright: ignore reportPrivateUsage=none    return embeddingsdef get_documents_vector_store() -> SupabaseVectorStore:    # settings = BrainSettings()  # pyright: ignore reportPrivateUsage=none    embeddings = get_embeddings()    supabase_client: Client = create_client(        os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY")    )    documents_vector_store = SupabaseVectorStore(        supabase_client, embeddings, table_name="vectors"    )    return documents_vector_storedef create_vector(doc):    documents_vector_store = get_documents_vector_store()    try:         sids = documents_vector_store.add_documents([doc])        if sids and len(sids) > 0:            return sids            except Exception as e:        logger.error(f"Error creating vector for document: {e}")        def create_user_vector(user_id, vector_id):    database = get_supabase_db()    response = (        database.db.table("user_vectors")        .insert(            {                "user_id": str(user_id),                "vector_id": str(vector_id),            }        )        .execute()    )    return response.data    def create_embedding_for_document(user_id, doc_with_metadata):    doc = DocumentSerializable.from_json(doc_with_metadata)    created_vector = create_vector(doc)    created_vector_id = created_vector[0]  # pyright: ignore reportPrivateUsage=none        create_user_vector(user_id, created_vector_id)    def compute_documents_from_pdf(file,loader):    loader = loader(file)    documents=[]    documents = loader.load()    # split the documents into chunks    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(        chunk_size=500, chunk_overlap=0    )    documents = text_splitter.split_documents(documents)    return documents        async def file_handler(    file: str,    file_original_name: str,    user_id,    loader_class=UnstructuredPDFLoader,   #the loader class from Langchain):    dateshort = time.strftime("%Y%m%d")        documents = compute_documents_from_pdf(file,loader_class)    for doc in documents:  # pyright: ignore reportPrivateUsage=none        metadata = {            "file_name": file_original_name,            "date": dateshort        }        doc_with_metadata = DocumentSerializable(            page_content=doc.page_content, metadata=metadata        )        create_embedding_for_document(            user_id, doc_with_metadata.to_json()        )    return "Планировщик обработал все файлы!"

В демонстрационных целях здесь тестируются только файлы pdf. Для более широкого спектра файловых форматов вы можете обратиться к базе кода Quivr, где используется класс File для обработки различных файловых форматов.

Тестирование от начала до конца

Чтобы протестировать это, просто включите сервер Uvicorn (для FastAPI) и сервер Celery.

uvicorn main:app --reload

celery -A celery_worker worker --loglevel=info --logfile=celery.log --concurrency=1 -P solo

— logfile (опционально): может сохранять файл журнала Celery в рабочем каталоге

— concurrency (опционально): устанавливает, сколько рабочих процессов вы хотите запустить одновременно

— P solo: мне нужно это, чтобы запустить Celery на ноутбуке с Windows. Если вы запускаете его на Mac/Docker, вам, вероятно, это не понадобится.

Вот фрагмент для тестирования точек доступа.

Источник: скриншот автора
Источник: скриншот автора