Организуйте рабочие процессы машинного обучения на основе Ray с использованием Amazon SageMaker

Организуйте рабочие процессы машинного обучения с помощью Ray и Amazon SageMaker

Машинное обучение (ML) становится все более сложным по мере того, как клиенты пытаются решать все более сложные задачи. Эта сложность часто приводит к необходимости распределенного МО, при котором для обучения одной модели используется несколько машин. Хотя это позволяет распараллеливать задачи на нескольких узлах, что приводит к ускорению времени обучения, улучшению масштабируемости и производительности, существуют значительные проблемы в эффективном использовании распределенного оборудования. Ученые-данных должны решать такие проблемы, как разделение данных, балансировка нагрузки, отказоустойчивость и масштабируемость. Инженеры МО должны вручную управлять параллелизацией, планированием, ошибками и повторами, требуя сложного инфраструктурного кода.

В этом посте мы обсудим преимущества использования Ray и Amazon SageMaker для распределенного МО и предоставим пошаговое руководство по использованию этих фреймворков для создания и развертывания масштабируемого рабочего процесса МО.

Ray, открытый распределенный фреймворк вычислений, предоставляет гибкую платформу для распределенного обучения и предоставления моделей МО. Он абстрагирует низкоуровневые детали распределенных систем с помощью простых, масштабируемых библиотек для общих задач МО, таких как предварительная обработка данных, распределенное обучение, настройка гиперпараметров, обучение с подкреплением и предоставление моделей.

SageMaker – это полностью управляемый сервис для создания, обучения и развертывания моделей МО. Ray без проблем интегрируется с функциями SageMaker для создания и развертывания сложных рабочих нагрузок МО, которые являются эффективными и надежными. Комбинация Ray и SageMaker обеспечивает полный цикл разработки масштабируемых рабочих процессов МО и имеет следующие особенности:

  • Распределенные акторы и конструкции параллелизма в Ray упрощают разработку распределенных приложений.
  • Рабочая среда Ray AI (AIR) устраняет препятствия на пути от разработки к продукции. С помощью Ray и AIR один и тот же код Python может масштабироваться без проблем от ноутбука до большого кластера.
  • Управляемая инфраструктура SageMaker и функции, такие как задания обработки, задания обучения и задания настройки гиперпараметров, могут использовать библиотеки Ray для распределенных вычислений.
  • Эксперименты Amazon SageMaker позволяют быстро итерироваться и отслеживать испытания.
  • Хранилище функций Amazon SageMaker обеспечивает масштабируемое хранилище для хранения, извлечения и обмена функциями МО для обучения моделей.
  • Обученные модели могут быть сохранены, версионированы и отслеживаться в реестре моделей Amazon SageMaker для обеспечения управления.
  • Amazon SageMaker Pipelines позволяет оркестрировать полный жизненный цикл МО от подготовки данных и обучения до развертывания модели в виде автоматизированных рабочих процессов.

Обзор решения

В этом посте мы фокусируемся на преимуществах совместного использования Ray и SageMaker. Мы настраиваем масштабируемый рабочий процесс МО на основе Ray, оркестрируя его с помощью SageMaker Pipelines. Рабочий процесс включает параллельную загрузку данных в хранилище функций с использованием акторов Ray, предварительную обработку данных с помощью Ray Data, обучение моделей и настройку гиперпараметров в масштабе с помощью Ray Train и заданий настройки гиперпараметров, а также оценку модели и регистрацию модели в реестре моделей.

В качестве данных мы используем синтетический набор данных о жилье, состоящий из восьми характеристик (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH и DECK), а наша модель будет предсказывать PRICE дома.

Каждый этап рабочего процесса МО разбит на отдельные шаги с собственным скриптом, который принимает входные и выходные параметры. В следующем разделе мы выделим основные фрагменты кода из каждого шага. Полный код можно найти в репозитории aws-samples-for-ray на GitHub.

Предварительные требования

Для использования Python SDK SageMaker и запуска кода, связанного с этим постом, вам нужны следующие предварительные требования:

  • Учетная запись AWS, содержащая все ваши ресурсы AWS
  • Роль AWS Identity and Access Management (IAM) с доступом к блокнотам SageMaker Studio, хранилищу функций SageMaker, реестру моделей SageMaker и конвейерам SageMaker

Загрузка данных в хранилище функций SageMaker

Первый шаг в рабочем процессе МО – считывание исходного файла данных из Amazon Simple Storage Service (Amazon S3) в формате CSV и загрузка его в хранилище функций SageMaker. Хранилище функций SageMaker – это специализированное хранилище, которое упрощает создание, совместное использование и управление функциями МО. Оно упрощает обнаружение, повторное использование и обмен функциями, что приводит к более быстрой разработке, увеличению сотрудничества внутри команд клиента и снижению затрат.

Загрузка функций в хранилище функций включает следующие шаги:

  1. Определение группы функций и создание группы функций в хранилище функций.
  2. Подготовка исходных данных для хранилища функций путем добавления времени события и идентификатора записи для каждой строки данных.
  3. Загрузка подготовленных данных в группу функций с использованием SDK Boto3.

В этом разделе мы подчеркиваем только Шаг 3, потому что это часть, которая включает параллельную обработку задачи ввода с использованием Ray. Вы можете ознакомиться с полным кодом для этого процесса в репозитории GitHub.

Метод ingest_features определен внутри класса под названием Featurestore. Обратите внимание, что класс Featurestore декорирован с использованием @ray.remote. Это указывает на то, что экземпляр этого класса является актором Ray, состояний и параллельной вычислительной единицей внутри Ray. Это модель программирования, которая позволяет создавать распределенные объекты, которые поддерживают внутреннее состояние и к которым можно обращаться одновременно несколькими задачами, выполняющимися на разных узлах в кластере Ray. Акторы предоставляют способ управлять и инкапсулировать изменяемое состояние, что делает их ценными для создания сложных приложений с состоянием в распределенной среде. Вы также можете указать требования к ресурсам в акторах. В этом случае каждому экземпляру класса FeatureStore потребуется 0,5 ЦП. См. следующий код:

@ray.remote(num_cpus=0.5)
class Featurestore:
    def ingest_features(self,feature_group_name, df, region):
        """
        Вносит признаки в группу хранилища признаков
        Args:
            feature_group_name (str): Название группы признаков
            data_path (str): Путь к данным обучения/проверки/теста в формате CSV.
        """
        
        ...

Вы можете взаимодействовать с актором, вызывая оператор remote. В следующем коде количество желаемых акторов передается в качестве входного аргумента в скрипт. Затем данные разделяются на основе количества акторов и передаются в удаленные параллельные процессы для ввода в хранилище признаков. Вы можете вызвать get для объекта ref, чтобы заблокировать выполнение текущей задачи, пока удаленное вычисление не будет завершено и результат не будет доступен. Когда результат будет доступен, ray.get вернет результат, и выполнение текущей задачи продолжится.

import modin.pandas as pd
import ray

df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Разделить на разделы
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Запустить акторов и назначить разделы в цикле
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = []

for actor, partition in zip(actors, input_partitions):
    results.append(actor.ingest_features.remote(
                        args.feature_group_name, 
                        partition, args.region
                      )
                )

ray.get(results)

Подготовка данных для обучения, проверки и тестирования

На этом шаге мы используем набор данных Ray для эффективного разделения, преобразования и масштабирования нашего набора данных в подготовке для машинного обучения. Ray Dataset предоставляет стандартный способ загрузки распределенных данных в Ray, поддерживая различные системы хранения и форматы файлов. Он имеет API для общих операций предварительной обработки данных ML, таких как параллельные преобразования, перемешивание, группировка и агрегация. Ray Dataset также обрабатывает операции, требующие состояния установки и GPU-ускорения. Он интегрируется плавно с другими библиотеками обработки данных, такими как Spark, Pandas, NumPy и другими, а также с фреймворками ML, такими как TensorFlow и PyTorch. Это позволяет создавать конвейеры данных и рабочие процессы ML end-to-end на основе Ray. Цель состоит в том, чтобы сделать распределенную обработку данных и ML более простыми для практиков и исследователей.

Давайте рассмотрим фрагменты скриптов, выполняющих эту предварительную обработку данных. Мы начинаем с загрузки данных из хранилища признаков:

def load_dataset(feature_group_name, region):
    """
    Загружает данные как набор данных ray из локации хранилища признаков без подключения
    Args:
        feature_group_name (str): имя группы признаков
    Returns:
        ds (ray.data.dataset): Ray dataset, который содержит запрошенные данные из хранилища признаков
    """
    session = sagemaker.Session(boto3.Session(region_name=region))
    fs_group = FeatureGroup(
        name=feature_group_name, 
        sagemaker_session=session
    )

    fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
    
    # Удалить добавленные хранилищем признаков столбцы
    # Поскольку они не связаны с решаемой задачей ML
    cols_to_drop = ["record_id", "event_time","write_time", 
                    "api_invocation_time", "is_deleted", 
                    "year", "month", "day", "hour"]           

    ds = ray.data.read_parquet(fs_data_loc)
    ds = ds.drop_columns(cols_to_drop)
    print(f"{fs_data_loc} количество - {ds.count()}")
    return ds

Затем мы разделяем и масштабируем данные с использованием более высокоуровневых абстракций, доступных в библиотеке ray.data:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None):
    """
    Разделить набор данных на обучающий, валидационный и тестовый наборы
    Аргументы:
        dataset (ray.data.Dataset): входные данные
        train_size (float): доля данных, используемых как обучающий набор
        val_size (float): доля данных, используемых как валидационный набор
        test_size (float): доля данных, используемых как тестовый набор
        random_state (int): Передайте int для воспроизводимых результатов при многократном вызове функции.
    Возвращает:
        train_set (ray.data.Dataset): обучающий набор данных
        val_set (ray.data.Dataset): валидационный набор данных
        test_set (ray.data.Dataset): тестовый набор данных
    """
    # Перемешиваем этот набор данных с фиксированным случайным семенем.
    shuffled_ds = dataset.random_shuffle(seed=random_state)
    # Разделяем данные на обучающий, валидационный и тестовый наборы
    train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size])
    return train_set, val_set, test_set

def scale_dataset(train_set, val_set, test_set, target_col):
    """
    Применить StandardScaler к обучающему набору данных и применить его к валидационному и тестовому наборам данных
    Аргументы:
        train_set (ray.data.Dataset): обучающий набор данных
        val_set (ray.data.Dataset): валидационный набор данных
        test_set (ray.data.Dataset): тестовый набор данных
        target_col (str): целевая колонка
    Возвращает:
        train_transformed (ray.data.Dataset): масштабированные обучающие данные
        val_transformed (ray.data.Dataset): масштабированные данные валидации
        test_transformed (ray.data.Dataset): масштабированные тестовые данные
    """
    tranform_cols = dataset.columns()
    # Удаляем целевые столбцы из масштабирования
    tranform_cols.remove(target_col)
    # настраиваем стандартный масштабировщик
    standard_scaler = StandardScaler(tranform_cols)
    # подгоняем масштабировщик к обучающему набору данных
    print("Приспосабливаем масштабирование к обучающим данным и преобразовываем набор данных...")
    train_set_transformed = standard_scaler.fit_transform(train_set)
    # применяем масштабировщик к валидационному и тестовому наборам данных
    print("Преобразуем валидационный и тестовый наборы данных...")
    val_set_transformed = standard_scaler.transform(val_set)
    test_set_transformed = standard_scaler.transform(test_set)
    return train_set_transformed, val_set_transformed, test_set_transformed

Обработанные обучающий, валидационный и тестовый наборы данных хранятся в Amazon S3 и будут переданы в качестве входных параметров для последующих шагов.

Выполнение тренировки модели и оптимизации гиперпараметров

После предварительной обработки данных и их готовности для моделирования настало время обучить некоторые модели машинного обучения и настроить их гиперпараметры для максимизации предсказательной производительности. Мы используем XGBoost-Ray, распределенную версию XGBoost, построенную на Ray, которая позволяет обучать модели XGBoost на больших наборах данных с использованием нескольких узлов и графических процессоров. Она предоставляет простые замены для функций обучения и предсказания XGBoost, обрабатывая сложности управления и обучения распределенных данных внутри.

Для обеспечения распределения обучения по нескольким узлам мы используем вспомогательный класс с именем RayHelper. Как показано в следующем коде, мы используем конфигурацию ресурсов задания обучения и выбираем первый хост в качестве главного узла:

class RayHelper():
    def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"):
        ....
        self.resource_config = self.get_resource_config()
        self.head_host = self.resource_config["hosts"][0]
        self.n_hosts = len(self.resource_config["hosts"])

Мы можем использовать информацию о хосте для определения способа инициализации Ray на каждом из экземпляров задания обучения:

def start_ray(self): 
   head_ip = self._get_ip_from_host()
   # Если текущий хост является выбранным главным узлом
   # запускаем `ray start` с указанием флага --head, чтобы он стал главным узлом
    if self.resource_config["current_host"] == self.head_host:
        output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', 
        self.ray_port, '--redis-password', self.redis_pass, 
        '--include-dashboard', 'false'], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        ray.init(address="auto", include_dashboard=False)
        self._wait_for_workers()
        print("Все рабочие присутствуют и работают")
        print(ray.cluster_resources())

    else:
       # Если текущий хост не является главным узлом, 
       # запускаем `ray start` с указанием IP-адреса head_host в качестве главного узла
        time.sleep(10)
        output = subprocess.run(['ray', 'start', 
        f"--address={head_ip}:{self.ray_port}", 
        '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        sys.exit(0)

При запуске задания обучения кластер Ray можно инициализировать, вызвав метод start_ray() на экземпляре RayHelper:

if __name__ == '__main__':
    ray_helper = RayHelper()
    ray_helper.start_ray()
    args = read_parameters()
    sess = sagemaker.Session(boto3.Session(region_name=args.region))

Затем мы используем тренажер XGBoost из XGBoost-Ray для обучения:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result:
    """
    Создает тренажер XGBoost, обучает его и возвращает результат.
    Аргументы:
        ds_train (ray.data.dataset): набор данных для обучения
        ds_val (ray.data.dataset): набор данных для валидации
        params (dict): гиперпараметры
        num_workers (int): количество рабочих для распределения обучения
        target_col (str): целевая колонка
    Возвращает:
        result (ray.air.result.Result): результат задания обучения
    """
    
    train_set = RayDMatrix(ds_train, 'PRICE')
    val_set = RayDMatrix(ds_val, 'PRICE')
    
    evals_result = {}
    
    trainer = train(
        params=params,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(val_set, "validation")],
        verbose_eval=False,
        num_boost_round=100,
        ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1),
    )
    
    output_path=os.path.join(args.model_dir, 'model.xgb')
    
    trainer.save_model(output_path)
    
    valMAE = evals_result["validation"]["mae"][-1]
    valRMSE = evals_result["validation"]["rmse"][-1]
 
    print('[3] #011validation-mae:{}'.format(valMAE))
    print('[4] #011validation-rmse:{}'.format(valRMSE))
    
    local_testing = False
    try:
        load_run(sagemaker_session=sess)
    except:
        local_testing = True
    if not local_testing: # Отслеживать эксперимент при использовании SageMaker Training
        with load_run(sagemaker_session=sess) as run:
            run.log_metric('validation-mae', valMAE)
            run.log_metric('validation-rmse', valRMSE)

Обратите внимание, что при создании тренажера trainer мы передаем RayParams, который принимает число актеров и число ЦП на актера. XGBoost-Ray использует эту информацию для распределения обучения по всем узлам, подключенным к кластеру Ray.

Теперь мы создаем объект эстиматора XGBoost на основе SageMaker Python SDK и используем его для задания HPO.

Организуйте предыдущие шаги с помощью SageMaker Pipelines

Для создания масштабируемого и многоразового рабочего процесса машинного обучения нам необходимо использовать инструмент CI/CD для организации предыдущих шагов в конвейер. SageMaker Pipelines имеет прямую интеграцию с SageMaker, SageMaker Python SDK и SageMaker Studio. Эта интеграция позволяет создавать рабочие процессы машинного обучения с помощью простого в использовании Python SDK, а затем визуализировать и управлять рабочим процессом с помощью SageMaker Studio. Вы также можете отслеживать историю ваших данных в рамках выполнения конвейера и назначать шаги для кэширования.

SageMaker Pipelines создает граф направленных ациклических графов (DAG), который включает шаги, необходимые для построения рабочего процесса машинного обучения. Каждый конвейер представляет собой серию взаимосвязанных шагов, организованных с помощью зависимостей данных между шагами, и может быть параметризован, позволяя вам предоставлять входные переменные в качестве параметров для каждого запуска конвейера. SageMaker Pipelines имеет четыре типа параметров конвейера: ParameterString, ParameterInteger, ParameterFloat и ParameterBoolean. В этом разделе мы параметризуем некоторые из входных переменных и настраиваем конфигурацию кэширования шага:

processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)
feature_group_name = ParameterString(
    name='FeatureGroupName',
    default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString(
    name='Bucket_Prefix',
    default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0)
    train_size = ParameterString(
    name='TrainSize',
    default_value="0.6"
)
val_size = ParameterString(
    name='ValidationSize',
    default_value="0.2"
)
test_size = ParameterString(
    name='TestSize',
    default_value="0.2"
)

cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

Мы определяем два этапа обработки: один для ввода в хранилище функций SageMaker, другой – для подготовки данных. Это должно выглядеть очень похоже на предыдущие описанные ранее шаги. Единственная новая строка кода – ProcessingStep после определения этапов, что позволяет нам взять конфигурацию задания обработки и включить ее в качестве шага конвейера. Мы также указываем зависимость этапа подготовки данных от этапа ввода в хранилище функций SageMaker. См. следующий код:

feature_store_ingestion_step = ProcessingStep(
    name='FeatureStoreIngestion',
    step_args=fs_processor_args,
    cache_config=cache_config
)

preprocess_dataset_step = ProcessingStep(
    name='PreprocessData',
    step_args=processor_args,
    cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

Аналогично, для создания шага обучения и настройки модели нам необходимо добавить определение TuningStep после кода шага обучения модели, чтобы мы могли запускать настройку гиперпараметров SageMaker в качестве шага в конвейере:

tuning_step = TuningStep(
    name="HPTuning",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

После шага настройки мы выбираем зарегистрировать лучшую модель в реестре моделей SageMaker. Чтобы контролировать качество модели, мы реализуем минимальный порог качества, сравнивая целевую метрику лучшей модели (RMSE) с порогом, определенным как входной параметр конвейера rmse_threshold. Для выполнения этой оценки мы создаем еще один этап обработки для запуска скрипта оценки модели. Результат оценки модели будет сохранен в виде файла свойств. Файлы свойств особенно полезны при анализе результатов этапа обработки для принятия решения о том, как должны выполняться другие шаги. См. следующий код:

# Указываем, где мы будем сохранять результаты оценки модели, чтобы другие шаги могли получить к ним доступ
evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='evaluation',
    path='evaluation.json',
)

# Используется ProcessingStep для оценки производительности выбранной модели из шага HPO.
# В этом случае оценивается наиболее эффективная модель.
evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(
                top_k=0, s3_bucket=bucket, prefix=s3_prefix
            ),
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation', source='/opt/ml/processing/evaluation'
        ),
    ],
    code='./pipeline_scripts/evaluate/script.py',
    property_files=[evaluation_report],
)

Мы определяем ModelStep для регистрации лучшей модели в реестре моделей SageMaker в нашем конвейере. В случае, если лучшая модель не проходит предварительную проверку качества, мы дополнительно указываем FailStep для вывода сообщения об ошибке:

register_step = ModelStep(
    name='RegisterTrainedModel',
    step_args=model_registry_args
)

metrics_fail_step = FailStep(
    name="RMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

Затем мы используем ConditionStep, чтобы определить, какой шаг – регистрация модели или шаг ошибки – следует выполнить следующим в конвейере. В нашем случае лучшая модель будет зарегистрирована, если ее показатель RMSE ниже порога.

# Шаг условия для оценки качества модели и ветвления выполнения
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path='regression_metrics.rmse.value',
    ),
    right=rmse_threshold,
)
condition_step = ConditionStep(
    name='CheckEvaluation',
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[metrics_fail_step],
)

Наконец, мы организуем все определенные шаги в конвейер:

pipeline_name = 'синтетический-жилищный-тренировочный-конвейер-ray'
step_list = [
             feature_store_ingestion_step,
             preprocess_dataset_step,
             tuning_step,
             evaluation_step,
             condition_step
            ]

training_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        feature_group_name,
        train_size,
        val_size,
        test_size,
        bucket_prefix,
        rmse_threshold
    ],
    steps=step_list
)

# Примечание: Если существующий конвейер имеет то же имя, он будет перезаписан.
training_pipeline.upsert(role_arn=role_arn)

Предыдущий конвейер можно визуализировать и выполнить непосредственно в SageMaker Studio или выполнить, вызвав execution = training_pipeline.start(). Ниже показана схема работы конвейера.

Кроме того, мы можем просмотреть связь продуктов, созданных выполнением конвейера.

from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

Развертывание модели

После регистрации лучшей модели в реестре моделей SageMaker через запуск конвейера мы развертываем модель на реальный конечную точку, используя полностью управляемые возможности развертывания модели SageMaker. У SageMaker также есть другие варианты развертывания модели для удовлетворения потребностей различных случаев использования. Для получения дополнительной информации см. Развертывание моделей для вывода, выбирая подходящий вариант для вашего случая использования. Сначала давайте зарегистрируем модель в реестре моделей SageMaker:

xgb_regressor_model = ModelPackage(
    role_arn,
    model_package_arn=model_package_arn,
    name=model_name
)

Текущий статус модели – PendingApproval. Перед развертыванием необходимо установить ее статус в Approved:

sagemaker_client.update_model_package(
    ModelPackageArn=xgb_regressor_model.model_package_arn,
    ModelApprovalStatus='Approved'
)

xgb_regressor_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    endpoint_name=endpoint_name
)

Очистка

После завершения экспериментов не забудьте очистить ресурсы, чтобы избежать ненужных затрат. Для очистки удалите конечную точку в реальном времени, группу моделей, конвейер и группу объектов путем вызова API DeleteEndpoint, DeleteModelPackageGroup, DeletePipeline и DeleteFeatureGroup соответственно, а также завершите все экземпляры блокнота SageMaker Studio.

Заключение

В этом посте показана пошаговая инструкция по использованию конвейеров SageMaker для организации рабочих процессов ML, основанных на Ray. Мы также продемонстрировали возможность интеграции конвейеров SageMaker с инструментами ML сторонних разработчиков. Существует различные службы AWS, которые поддерживают нагрузку Ray в масштабируемом и безопасном режиме для обеспечения высокой производительности и операционной эффективности. Теперь ваша очередь изучить эти мощные возможности и начать оптимизировать рабочие процессы машинного обучения с помощью Amazon SageMaker Pipelines и Ray. Действуйте сегодня и раскройте полный потенциал ваших проектов ML!