Развертывание настраиваемой модели машинного обучения в качестве конечной точки SageMaker

Внедрение настраиваемой модели машинного обучения как конечной точки в SageMaker

Фото от Ricardo Gomez Angel на Unsplash

Развёртывание конечной точки SageMaker

Быстрый и простой гид по созданию конечной точки AWS SageMaker для вашей модели

Разработка модели машинного обучения (МО) включает ключевые шаги, от сбора данных до развёртывания модели. После оптимизации алгоритмов и проверки производительности через тестирование, последний важный шаг – развёртывание. Эта фаза превращает инновацию в полезность, позволяя другим пользователем использовать способности модели для предсказаний. Развёрнутая модель МО соединяет разработку и реальный мир, предоставляя конкретные преимущества пользователям и заинтересованным сторонам.

Этот гид охватывает основные шаги, необходимые для разработки пользовательской модели МО в качестве конечной точки SageMaker. На этом этапе я предполагаю, что у вас уже есть рабочая модель и вы хотите предоставить ее остальному миру через конечную точку. В гиде вам будет предложено развернуть модель, основанную на PyTorch, которая направлена на предсказание аномалий в видеоклипах. Модель, также известная как AI VAD, основана на документе «Атрибутивные представления для точного и интерпретируемого обнаружения аномалий в видео», а ее реализацию можно найти в репозитории GitHub anomalib от OpenVINO. Чтобы узнать больше об этом интересном подходе, прочитайте приложение в конце этого блога.

На этом этапе я хочу подчеркнуть, что в этом случае нельзя использовать абстракцию PyTorchModel, специально созданную для развертывания моделей PyTorch, по двум причинам. Первая причина заключается в том, что у нас есть пакет anomalib как дополнительная зависимость, которая не включена в предварительно собранное изображение SageMaker PyTorch. Вторая причина заключается в том, что модель требует дополнительной информации, которая была получена в процессе обучения и не является частью весов модели PyTorch.

Ниже представлены шаги для достижения этой цели:

  1. Написать скрипт обслуживания модели SageMaker
  2. Загрузить модель в S3
  3. Загрузить настраиваемый образ Docker в AWS ECR
  4. Создать модель в SageMaker
  5. Создать конфигурацию конечной точки
  6. Создать конечную точку
  7. Вызвать конечную точку

Написать скрипт обслуживания модели SageMaker

Скрипт обслуживания модели SageMaker (inference.py) является важным компонентом при создании модели SageMaker. Он соединяет модели машинного обучения и данные реального мира. По сути, он обрабатывает входящие запросы, выполняет предсказания модели и возвращает результаты, влияя на процесс принятия решений приложения.

Скрипт inference.py состоит из нескольких ключевых методов, каждый из которых выполняет уникальную задачу и вместе облегчает процесс обслуживания модели. Ниже перечислены четыре основных метода.

  1. Метод model_fn отвечает за загрузку обученной модели. Он считывает сохраненные артефакты модели и возвращает объект модели, который можно использовать для предсказаний. Этот метод вызывается только один раз при запуске сервера модели SageMaker.
  2. Метод input_fn принимает данные запроса и форматирует их для выполнения предсказаний. Например, в коде ниже эта функция форматирует данные по-разному в зависимости от источника данных (байты изображения или список S3-URI) и того, должен ли список кадров считаться одним видеоклипом.
  3. Метод predict_fn принимает отформатированные данные запроса и выполняет вывод на основе загруженной модели.
  4. Наконец, используется метод output_fn. Он берет результат предсказания и форматирует его в сообщение ответа. Например, упаковывает его в JSON-объект.

Код для скрипта обслуживания модели Sagemaker можно найти ниже.

import os
import json
import joblib
import torch
from PIL import Image
import numpy as np
import io
import boto3
from enum import Enum
from urllib.parse import urlsplit
from omegaconf import OmegaConf
from anomalib.data.utils import read_image, InputNormalizationMethod, get_transforms
from anomalib.models.ai_vad.torch_model import AiVadModel

device = "cuda"

class PredictMode(Enum):
    frame = 1
    batch = 2
    clip = 3

def model_fn(model_dir):
    """
    Эта функция выполняется первой при запросе на предсказание,
    она загружает модель с диска и возвращает объект модели для дальнейшего вывода.
    """
    # Загрузка конфигурационного файла
    config = OmegaConf.load(os.path.join(model_dir, "ai_vad_config.yaml"))
    config_model = config.model
    
    # Загрузка модели
    model = AiVadModel(
        box_score_thresh=config_model.box_score_thresh,
        persons_only=config_model.persons_only,
        min_bbox_area=config_model.min_bbox_area,
        max_bbox_overlap=config_model.max_bbox_overlap,
        enable_foreground_detections=config_model.enable_foreground_detections,
        foreground_kernel_size=config_model.foreground_kernel_size,
        foreground_binary_threshold=config_model.foreground_binary_threshold,
        n_velocity_bins=config_model.n_velocity_bins,
        use_velocity_features=config_model.use_velocity_features,
        use_pose_features=config_model.use_pose_features,
        use_deep_features=config_model.use_deep_features,
        n_components_velocity=config_model.n_components_velocity,
        n_neighbors_pose=config_model.n_neighbors_pose,
        n_neighbors_deep=config_model.n_neighbors_deep
    )
    
    # Загрузка весов модели
    model.load_state_dict(torch.load(os.path.join(model_dir, "ai_vad_weights.pth"), map_location=device), strict=False)
    
    # Загрузка памяти
    velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank = joblib.load(
        os.path.join(model_dir, "ai_vad_banks.joblib")
    )
    
    # Применение памяти к модели
    if velocity_estimator_memory_bank is not None:
        model.density_estimator.velocity_estimator.memory_bank = velocity_estimator_memory_bank
    if pose_estimator_memory_bank is not None:
        model.density_estimator.pose_estimator.memory_bank = pose_estimator_memory_bank
    if appearance_estimator_memory_bank is not None:
        model.density_estimator.appearance_estimator.memory_bank = appearance_estimator_memory_bank
    
    model.density_estimator.fit()
    
    # Перенос модели на устройство
    model = model.to(device)
    
    # Получение преобразований
    transform_config = config.dataset.transform_config.eval if "transform_config" in config.dataset.keys() else None
    image_size = (config.dataset.image_size[0], config.dataset.image_size[1])
    center_crop = config.dataset.get("center_crop")
    center_crop = tuple(center_crop) if center_crop is not None else None
    normalization = InputNormalizationMethod(config.dataset.normalization)
    transform = get_transforms(config=transform_config, image_size=image_size, center_crop=center_crop, normalization=normalization)
    
    return model, transform

def input_fn(request_body, request_content_type):
    """
    request_body передается SageMaker, а тип содержимого передается запросом HTTP-заголовка клиента (или вызывающего).
    """
    print("input_fn-----------------------")
    
    if request_content_type in ("application/x-image", "image/x-image"):
        image = Image.open(io.BytesIO(request_body)).convert("RGB")
        numpy_array = np.array(image)
        print("numpy_array.shape", numpy_array.shape)
        print("input_fn-----------------------")
        return [numpy_array], PredictMode.frame
    
    elif request_content_type == "application/json":
        request_body_json = json.loads(request_body)
        s3_uris = request_body_json.get("images", [])
        
        if len(s3_uris) == 0:
            raise ValueError("Images - обязательное поле и должно содержать хотя бы один S3 URI")
        
        s3 = boto3.client("s3")
        frame_paths = []
        
        for s3_uri in s3_uris:
            parsed_url = urlsplit(s3_uri)
            bucket_name = parsed_url.netloc
            object_key = parsed_url.path.lstrip('/')
            local_frame_path = f"/tmp/{s3_uri.replace('/', '_')}"
            
            # Скачивание кадра из S3
            s3.download_file(bucket_name, object_key, local_frame_path)
            frame_paths.append(local_frame_path)
        
        frames = np.stack([torch.Tensor(read_image(frame_path)) for frame_path in frame_paths], axis=0)
                
        predict_mode = PredictMode.clip if request_body_json.get("clip", False) else PredictMode.batch
        
        print("frames.shape", frames.shape)
        print("predict_mode", predict_mode)
        print("input_fn-----------------------")
        
        return frames, predict_mode
    
    # Если тип содержимого не соответствует ожидаемому, выбрасывается исключение
    raise ValueError(f"Тип содержимого {request_content_type} не поддерживается")

def predict_fn(input_data, model):
    """
    Эта функция принимает входные данные и модель, возвращенную model_fn.
    Она выполняется после model_fn, а ее выход возвращается в качестве ответа API.
    """
    print("predict_fn-----------------------")
    
    model, transform = model
    
    frames, predict_mode = input_data
    processed_data = {}
    processed_data["image"] = [transform(image=frame)["image"] for frame in frames]
    processed_data["image"] = torch.stack(processed_data["image"])
    image = processed_data["image"].to(device)
    
    # Добавление еще одного измерения для размера пакета одного клипа
    if predict_mode == PredictMode.clip:
        image = image.unsqueeze(0)
    
    print("image.shape", image.shape)
    
    model.eval()
    with torch.no_grad():
        boxes, anomaly_scores, image_scores = model(image)
    
    print("boxes_len", [len(b) for b in boxes])
    
    processed_data["pred_boxes"] = [box.int() for box in boxes]
    processed_data["box_scores"] = [score.to(device) for score in anomaly_scores]
    processed_data["pred_scores"] = torch.Tensor(image_scores).to(device)
    
    print("predict_fn-----------------------")
    
    return processed_data

def output_fn(prediction, accept):
    """
    Функция пост-обработки предсказаний модели.
    Она выполняется после predict_fn.
    """
    print("output_fn-----------------------")
    
    # Проверка типа accept
    if accept != "application/json":
        raise ValueError(f"Тип accept {accept} не поддерживается")
    
    # Преобразование PyTorch Tensors в списки, чтобы они могли быть сериализованы в JSON
    for key in prediction:
        # Если это torch.Tensor, преобразуем его в список
        if isinstance(prediction[key], torch.Tensor):
            prediction[key] = prediction[key].tolist()
        # Если это список, преобразуем каждый тензор в списке
        elif isinstance(prediction[key], list):
            prediction[key] = [tensor.tolist() if isinstance(tensor, torch.Tensor) else tensor for tensor in prediction[key]]
    
    print("output_fn-----------------------")
    
    return json.dumps(prediction), accept

P.S. Сильно рекомендуется протестировать скрипт обслуживания модели перед переходом к следующему шагу. Это можно легко сделать, имитируя вызов конвейера, как показано в коде ниже.

import jsonfrom inference import model_fn, predict_fn, input_fn, output_fnresponse, accept = output_fn(    predict_fn(        input_fn(payload, "application/x-image"),        model_fn("../")    ),    "application/json")json.loads(response).keys()

Загрузить модель в S3

Для создания конечной точки SageMaker, в которой загружается модель AI VAD PyTorch в точно таком же состоянии, нам понадобятся следующие файлы:

  • Веса модели AI VAD PyTorch (также известные как state_dict)
  • Память банков плотности эстиматора (которая не является частью весов модели)
  • Файл конфигурации с гиперпараметрами модели PyTorch
  • Скрипт обслуживания модели в Sagemaker (inference.py)

Ниже приведен код, демонстрирующий, как организовать все необходимые файлы в одном каталоге.

P.S. Я переопределил встроенный обратный вызов PyTorch ModelCheckpoint, чтобы убедиться, что эти банки памяти сохраняются вместе с сохранением контрольной точки (реализацию можно найти здесь).

import torchimport joblibimport shutilcheckpoint = "results/ai_vad/ucsd/run/weights/lightning/model.ckpt"config_path = "results/ai_vad/ucsd/run/config.yaml"model_weights = torch.load(checkpoint)model_state_dict = model_weights["state_dict"]torch.save(model_state_dict, "../ai_vad_weights.pth")velocity_estimator_memory_bank = Nonepose_estimator_memory_bank = Noneappearance_estimator_memory_bank = Noneif "velocity_estimator_memory_bank" in model_weights:    velocity_estimator_memory_bank = model_weights["velocity_estimator_memory_bank"]if "pose_estimator_memory_bank" in model_weights:    pose_estimator_memory_bank = model_weights["pose_estimator_memory_bank"]if "appearance_estimator_memory_bank" in model_weights:    appearance_estimator_memory_bank = model_weights["appearance_estimator_memory_bank"]banks = (velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank)joblib.dump(banks, "../ai_vad_banks.joblib")shutil.copyfile(config_path, "../ai_vad_config.yaml")

Затем все четыре файла были объединены вместе для создания tar.gz с помощью следующей команды.

tar -czvf ../ai_vad_model.tar.gz -C ../ ai_vad_weights.pth ai_vad_banks.joblib ai_vad_config.yaml inference.py

Наконец, файл был загружен в S3 с помощью boto3.

import boto3from datetime import datetimecurrent_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')s3 = boto3.resource('s3')s3.meta.client.upload_file("../ai_vad_model.tar.gz", "ai-vad", f"{current_datetime}/ai_vad_model.tar.gz")

Загрузка пользовательского образа Docker в AWS ECR

Как уже упоминалось выше, поскольку у нас есть дополнительная зависимость, которая не включена в предварительно построенный образ PyTorch Sagemaker (т.е. пакет anomalib), мы создали новый образ Docker для этой цели. Перед созданием пользовательского образа Docker требуется аутентификация в хранилище Amazon ECR.

REGION=<my_aws_region>ACCOUNT=<my_aws_account> # Аутентификация Docker в реестре Amazon ECRaws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin <docker_registry_url>.dkr.ecr.$REGION.amazonaws.com# Вход в ваш частный реестр Amazon ECRaws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin $ACCOUNT.dkr.ecr.$REGION.amazonaws.com

Файл Dockerfile можно найти ниже, а разные пути реестра Docker можно найти здесь. Убедитесь, что выбран правильный путь реестра на основе потребностей модели (CPU/GPU, версия Python и т.д.) и вашего региона AWS. Например, если регион us-east-1, полный путь к реестру Docker должен выглядеть примерно таким образом:763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.0-gpu-py310

# Используйте образ SageMaker PyTorch в качестве базового образа
FROM <docker_registry_url>.dkr.ecr.<my_aws_region>.amazonaws.com/pytorch-inference:2.0.0-gpu-py310
# Установите дополнительную зависимость
RUN pip install "git+https://github.com/hairozen/anomalib.git@ai-vad-inference-improvements"

Теперь мы можем выполнить классическую команду Docker build для создания этого пользовательского образа.

docker build -t ai-vad-image .

Следующий шаг – создать репозиторий AWS ECR для созданного нами нового образа, пометить его и загрузить образ в репозиторий AWS ECR.

# Создание репозитория AWS ECR
aws ecr create-repository --repository-name ai-vad-image
# Пометить образ
docker tag ai-vad-image:latest $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest
# Загрузка помеченного образа в репозиторий AWS ECR
docker push $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest

Создание модели в SageMaker

Этот шаг достаточно прост. Код ниже.

import boto3
import sagemaker

sagemaker_client = boto3.client(service_name="sagemaker")
role = sagemaker.get_execution_role()

model_name = f"ai-vad-model-{current_datetime}"
primary_container = {
    "Image": f"{my_aws_account}.dkr.ecr.{my_aws_region}.amazonaws.com/ai-vad-image:latest",
    "ModelDataUrl": f"s3://ai-vad/{current_datetime}/ai_vad_model.tar.gz"}

create_model_response = sagemaker_client.create_model(
    ModelName=model_name, 
    ExecutionRoleArn=role, 
    PrimaryContainer=primary_container)

Создание конфигурации конечной точки

Следующий шаг состоит в создании конфигурации конечной точки. Ниже представлен простой пример.

endpoint_config_name = f"ai-vad-model-config-{current_datetime}"
sagemaker_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[{
        "InstanceType": "ml.g5.xlarge",
        "InitialVariantWeight": 1,
        "InitialInstanceCount": 1,
        "ModelName": model_name,
        "VariantName": "AllTraffic"}])

Создание конечной точки

Теперь мы готовы создать саму конечную точку.

endpoint_name = f"ai-vad-model-endpoint-{current_datetime}"
sagemaker_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)

Обратите внимание, что потребуется несколько минут, чтобы состояние конечной точки изменилось с “Создание” на “В работе” (“InService”). Текущее состояние можно проверить, как показано ниже.

response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
response["EndpointStatus"]

Вызов конечной точки

Пришло время проверить результаты. Теперь пришло время вызвать конечную точку, чтобы проверить, что все работает, как ожидается.

with open(file_name, "rb") as f:
    payload = f.read()

predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name)
predictor.serializer = DataSerializer(content_type="image/x-image")
predictor.predict(payload)

Так что это хорошая проверка, но стоит учесть, что функция predictor.predict не запускает полный процесс вызова из скрипта обслуживания SageMaker, который включает в себя:output_fn(predict_fn(input_fn(input_data, model_fn(model_dir)),accept)

Чтобы также протестировать это, давайте вызовем модель, используя вызов API.

with open(file_name, "rb") as f:
    payload = f.read()

sagemaker_runtime = boto3.client("runtime.sagemaker")
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="image/x-image",
    Body=payload)
response = json.loads(response["Body"].read().decode())

Используя отличную визуализацию, предоставляемую anomalib, мы можем нарисовать рамки и их метки для заданного кадра из набора данных UCSDped2.

Изображение автора. Изображение было создано с использованием пакета anomalib на основе набора данных UCSD Anomaly Detection. Зеленые прямоугольники указывают на то, что у этих пешеходов нет аномалии в их ходьбе, тогда как красный прямоугольник для велосипедиста указывает на аномалию, вероятно, связанную с особенностями скорости и позы модели AI VAD.

Вывод

Хорошо, давайте быстро подведем итоги того, что мы рассмотрели здесь. Развертывание модели SageMaker для обслуживания требует ряда шагов.

Во-первых, необходимо написать сценарий обслуживания модели SageMaker, чтобы определить функциональность и поведение модели.

Затем модель загружается в Amazon S3 для хранения и извлечения.Кроме того, настраивается пользовательский образ Docker, который загружается в реестр контейнеров AWS Elastic Container Registry (ECR) для контейниризации модели и ее зависимостей. Следующий шаг состоит в создании модели в SageMaker, которая ассоциирует артефакты модели, хранящиеся в S3, с образом Docker, хранящимся в ECR.

Затем создается конфигурация конечной точки, в которой определяется количество и тип инстансов, используемых для хостинга модели.

Наконец, создается конечная точка, чтобы установить живое соединение между развернутой моделью и клиентскими приложениями, позволяя им вызывать конечную точку и делать прогнозы в реальном времени.

Через эти шаги развертывание модели SageMaker становится упрощенным процессом, который обеспечивает эффективное и надежное обслуживание модели.

Приложение

Статья Attribute-based Representations for Accurate and Interpretable Video Anomaly Detection, опубликованная в 2023 году Рейссом и др., предлагает простой, но очень эффективный метод обнаружения аномалий в видео (VAD) с использованием атрибутных представлений.

В статье утверждается, что традиционные методы VAD, которые часто основаны на глубоком обучении, часто трудны для интерпретации, что затрудняет пользователям понять, почему система помечает определенные кадры или объекты как аномальные.

Для решения этой проблемы авторы предлагают метод, который представляет каждый объект в видео по его скорости, позе и глубине. Эти атрибуты легко понять и интерпретировать, и они могут быть использованы для вычисления оценок аномалий с использованием подхода, основанного на плотности.

Статья показывает, что такое простое представление достаточно для достижения передовых результатов на нескольких сложных наборах данных VAD, включая ShanghaiTech, самый крупный и сложный набор данных VAD.

В дополнение к точности авторы также демонстрируют, что их метод интерпретируем. Например, они могут предоставить пользователям список объектов в видео, вносящих наибольший вклад в его оценку аномалий, а также информацию о их скорости, позе и глубине. Это поможет пользователям понять, почему система помечает видео как аномальное.

В целом, эта статья является значительным вкладом в область VAD. Она предлагает простой, точный и интерпретируемый метод для VAD, который может использоваться в различных приложениях.