Эффективное балансирование нагрузки с использованием Ray на Amazon SageMaker

Balancing Workload Efficiently with Ray on Amazon SageMaker

Метод повышения эффективности обучения DNN и снижения затрат на обучение

Фото от Fineas Anton на Unsplash

В предыдущих постах (например, здесь) мы подробно рассмотрели важность профилирования и оптимизации производительности ваших рабочих нагрузок обучения DNN. Обучение моделей глубокого обучения, особенно больших, может быть дорогостоящим процессом. Ваша способность максимизировать использование ваших ресурсов для обучения таким образом, чтобы ускорить сходимость модели и минимизировать затраты на обучение, может стать решающим фактором в успехе вашего проекта. Оптимизация производительности — это итеративный процесс, в котором мы выявляем и решаем узкие места производительности в нашем приложении, то есть части приложения, которые мешают нам увеличивать использование ресурсов и/или ускорять время выполнения.

Этот пост является третьим в серии постов, посвященных одному из наиболее распространенных узких мест производительности, с которым мы сталкиваемся при обучении моделей глубокого обучения — узкому месту предварительной обработки данных. Узкое место предварительной обработки данных возникает, когда наш GPU (или альтернативный ускоритель) — обычно самый дорогой ресурс в настройке обучения — оказывается бездействующим, ожидая ввода данных от перегруженных ресурсов ЦП.

Изображение из вкладки профайлера TensorBoard, демонстрирующее типичный след бутылочного горлышка в процессе ввода данных. Мы четко видим длинные периоды простоя GPU на каждом седьмом шаге обучения. (Автор)

В нашем первом посте по этой теме мы обсудили и продемонстрировали различные способы решения этого типа узкого места, включая:

  1. Выбор экземпляра обучения с соотношением вычислений ЦП и GPU, более подходящим для вашей рабочей нагрузки,
  2. Улучшение баланса нагрузки между ЦП и GPU путем перемещения некоторых операций ЦП на GPU и
  3. Перенос некоторых вычислений ЦП на вспомогательные устройства ЦП.

Мы продемонстрировали третий вариант, используя API TensorFlow Data Service, решение, специфичное для TensorFlow, в котором часть обработки входных данных может быть перенесена на другие устройства с использованием gRPC в качестве протокола связи.

В нашем втором посте мы предложили более общее решение на основе gRPC для использования вспомогательных устройств ЦП и продемонстрировали его на игрушечной модели PyTorch. Хотя для этого требовалось немного больше кодирования и настройки, чем для API TensorFlow Data Service, решение обеспечило гораздо большую надежность и позволило достичь той же оптимизации производительности обучения.

Балансировка нагрузки с помощью Ray

В этом посте мы продемонстрируем дополнительный метод использования вспомогательных устройств ЦП, который сочетает в себе надежность общего решения и простоту использования API, специфичного для TensorFlow. Метод, который мы продемонстрируем, будет использовать наборы данных Ray из библиотеки Ray Data. Используя полную мощность систем управления ресурсами и распределенного планирования Ray, Ray Data способен выполнять нашу конвейерную обработку данных обучения таким образом, что она становится масштабируемой и распределенной. В частности, мы настроим наш набор данных Ray таким образом, чтобы библиотека автоматически обнаруживала и использовала все доступные ЦП-ресурсы для предварительной обработки обучающих данных. Кроме того, мы обернем наш цикл обучения модели в Ray AIR Trainer, чтобы обеспечить безпроблемное масштабирование для многократного использования GPU.

Развертывание кластера Ray на Amazon SageMaker

Для использования фреймворка Ray и предлагаемых им утилит в многонодовой среде требуется развертывание кластера Ray. В целом, проектирование, развертывание, управление и поддержка такого вычислительного кластера могут быть непростой задачей и часто требуют специалиста по разработке или команды разработчиков. Это может стать непреодолимым препятствием для некоторых команд разработки. В этом посте мы продемонстрируем, как преодолеть это препятствие с помощью управляемой службы обучения AWS, Amazon SageMaker. В частности, мы создадим гетерогенный кластер SageMaker с инстансами GPU и ЦП и будем использовать его для развертывания кластера Ray при запуске. Затем мы запустим приложение обучения Ray AIR на этом кластере Ray с использованием встроенной возможности Ray для эффективной балансировки нагрузки по всем ресурсам кластера. После завершения обучения приложения кластер Ray будет автоматически уничтожен. Использование SageMaker в таком режиме позволяет развернуть и использовать кластер Ray без накладных расходов, которые обычно связаны с управлением кластером.

Ray – это мощный фреймворк, который позволяет выполнять широкий спектр задач машинного обучения. В этой статье мы продемонстрируем всего лишь несколько его возможностей и API, используя версию Ray 2.6.1. Эта статья не должна использоваться в качестве замены документации Ray. Обязательно ознакомьтесь с официальной документацией, чтобы использовать Ray utilities наиболее адекватным и актуальным образом.

Прежде чем мы начнем, особая благодарность Боруху Чалку за то, что он познакомил меня с библиотекой Ray Data и ее уникальными возможностями.

Игрушечный пример

Для облегчения нашего обсуждения мы определим и обучим простую модель классификации на основе PyTorch (2.0) Vision Transformer, которую мы обучим на синтетическом наборе данных, состоящем из случайных изображений и меток. В документации Ray AIR представлено множество примеров, демонстрирующих, как создавать различные типы рабочих нагрузок обучения с использованием Ray AIR. Скрипт, который мы создадим здесь, в основном следует описанным в примере классификатора изображений PyTorch шагам.

Определение набора данных Ray и предварительной обработки

API тренера Ray AIR различает сырой набор данных и конвейер предварительной обработки, применяемый к элементам набора данных перед их подачей в цикл обучения. Для нашего сырого набора данных Ray мы создаем простой диапазон целых чисел размером num_records. Затем мы определяем предварительную обработку, которую мы хотим применить к нашему набору данных. Наш предварительный обработчик Ray содержит два компонента: первый – это BatchMapper, который отображает сырые целые числа на случайные пары изображений и меток. Второй – это TorchVisionPreprocessor, который выполняет преобразование torchvision для наших случайных пакетов, которые преобразуют их в тензоры PyTorch и применяют серию операций GaussianBlur. Операции GaussianBlur предназначены для имитации относительно сложного конвейера предварительной обработки данных. Два предварительных обработчика объединяются с использованием Chain Preprocessor. Создание набора данных и предварительной обработки Ray показано в следующем блоке кода:

import rayfrom typing import Dict, Tupleimport numpy as npimport torchvision.transforms as transformsfrom ray.data.preprocessors import Chain, BatchMapper, TorchVisionPreprocessordef get_ds(batch_size, num_records):    # создаем сырой табличный набор данных Ray    ds = ray.data.range(num_records)    # отображаем целое число на случайную пару изображений и меток    def synthetic_ds(batch: Tuple[int]) -> Dict[str, np.ndarray]:        labels = batch['id']        batch_size = len(labels)        images = np.random.randn(batch_size, 224, 224, 3).astype(np.float32)        labels = np.array([label % 1000 for label in labels]).astype(                                                               dtype=np.int64)        return {"image": images, "label": labels}    # первый шаг предварительной обработки отображает пакеты целых чисел на    # случайные пары изображений и меток    synthetic_data = BatchMapper(synthetic_ds,                                  batch_size=batch_size,                                  batch_format="numpy")    # определяем преобразование torchvision, которое преобразует numpy-пары в     # тензоры, а затем применяет серию гауссовских размытий для имитации    # сложной предварительной обработки       transform = transforms.Compose(        [transforms.ToTensor()] + [transforms.GaussianBlur(11)]*10    )    # второй шаг предварительной обработки применяет преобразование torchvision    vision_preprocessor = TorchVisionPreprocessor(columns=["image"],                                                   transform=transform)    # комбинируем шаги предварительной обработки    preprocessor = Chain(synthetic_data, vision_preprocessor)    return ds, preprocessor

Обратите внимание, что конвейер данных Ray автоматически использует все доступные ЦП, которые есть в кластере Ray. Это включает ЦП-ресурсы на графическом процессоре, а также ЦП-ресурсы любых дополнительных вспомогательных экземпляров в кластере.

Определение цикла обучения

Следующим шагом является определение последовательности обучения, которая будет выполняться на каждом из рабочих узлов обучения (например, GPU). Сначала мы определяем модель, используя популярный пакет Python timm (0.6.13), и оборачиваем ее с помощью API train.torch.prepare_model. Затем мы извлекаем соответствующий фрагмент из набора данных и определяем итератор, который генерирует пакеты данных с требуемым размером пакета и копирует их на устройство обучения. Затем следует сам цикл обучения, состоящий из стандартного кода PyTorch. При завершении цикла мы сообщаем результат метрики потерь. Последовательность обучения для каждого рабочего узла показана в следующем блоке кода:

import timefrom ray import trainfrom ray.air import sessionimport torch.nn as nnimport torch.optim as optimfrom timm.models.vision_transformer import VisionTransformer# создаем модель ViT с использованием timmdef build_model():    return VisionTransformer()# определяем цикл обучения для каждого рабочего узлаdef train_loop_per_worker(config):    # оборачиваем модель PyTorch объектом Ray    model = train.torch.prepare_model(build_model())    criterion = nn.CrossEntropyLoss()    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)    # получаем соответствующий фрагмент набора данных    train_dataset_shard = session.get_dataset_shard("train")    # создаем итератор, возвращающий пакеты из набора данных    train_dataset_batches = train_dataset_shard.iter_torch_batches(        batch_size=config["batch_size"],        prefetch_batches=config["prefetch_batches"],        device=train.torch.get_device()    )    t0 = time.perf_counter()    for i, batch in enumerate(train_dataset_batches):        # получаем входы и метки        inputs, labels = batch["image"], batch["label"]        # обнуляем градиенты параметров        optimizer.zero_grad()        # прямой и обратный проходы + оптимизация        outputs = model(inputs)        loss = criterion(outputs, labels)        loss.backward()        optimizer.step()        # печатаем статистику        if i % 100 == 99:  # печатаем каждые 100 мини-пакетов            avg_time = (time.perf_counter()-t0)/100            print(f"Iteration {i+1}: avg time per step {avg_time:.3f}")            t0 = time.perf_counter()    metrics = dict(running_loss=loss.item())    session.report(metrics)

Определение тренера Ray Torch

После того, как мы определили нашу конвейер данных и цикл обучения, мы можем перейти к настройке тренера Ray TorchTrainer. Мы настраиваем тренера таким образом, чтобы учитывать доступные ресурсы в кластере. Конкретно, мы устанавливаем количество рабочих для обучения в соответствии с количеством GPU, и устанавливаем размер пакета в соответствии с доступной памятью на целевом GPU. Мы создаем наш набор данных с необходимым количеством записей для обучения ровно 1000 шагов.

from ray.train.torch import TorchTrainerfrom ray.air.config import ScalingConfigdef train_model():    # мы настроим количество рабочих, размер нашего    # набора данных и размер хранилища данных в соответствии с    # доступными ресурсами     num_gpus = int(ray.available_resources().get("GPU", 0))        # устанавливаем количество рабочих для обучения в соответствии с количеством GPU    num_workers = num_gpus if num_gpus > 0 else 1    # устанавливаем размер пакета на основе емкости памяти GPU    # семейства экземпляров Amazon EC2 g5    batch_size = 64    # создаем синтетический набор данных с достаточным количеством данных для обучения на 1000 шагов    num_records = batch_size * 1000 * num_workers    ds, preprocessor = get_ds(batch_size, num_records)    ds = preprocessor(ds)     trainer = TorchTrainer(        train_loop_per_worker=train_loop_per_worker,        train_loop_config={"batch_size": batch_size},        datasets={"train": ds},        scaling_config=ScalingConfig(num_workers=num_workers,                                      use_gpu=num_gpus > 0),    )    trainer.fit()

Развертывание кластера Ray и запуск последовательности обучения

Теперь мы определяем точку входа в наш скрипт обучения. Здесь мы настраиваем кластер Ray и запускаем последовательность обучения на главном узле. Мы используем класс Environment из библиотеки sagemaker-training для обнаружения экземпляров в гетерогенном кластере SageMaker, как описано в этом руководстве. Мы определяем первый узел группы экземпляров с GPU в качестве главного узла кластера Ray и запускаем соответствующую команду на всех остальных узлах для их подключения к кластеру. (См. документацию Ray для получения дополнительной информации о создании кластеров.) Мы программируем главный узел для ожидания, пока все узлы подключатся, а затем запускаем последовательность обучения. Это гарантирует, что Ray будет использовать все доступные ресурсы при определении и распределении задач Ray.

import timeimport subprocessfrom sagemaker_training import environmentif __name__ == "__main__":    # используем класс Environment() для автоматического обнаружения кластера SageMaker    env = environment.Environment()    if env.current_instance_group == 'gpu' and \             env.current_instance_group_hosts.index(env.current_host) == 0:        # главный узел запускает кластер Ray        p = subprocess.Popen('ray start --head --port=6379',                             shell=True).wait()        ray.init()        # вычисляем общее количество узлов в кластере        groups = env.instance_groups_dict.values()        cluster_size = sum(len(v['hosts']) for v in list(groups))        # ждем, пока все узлы SageMaker подключатся к кластеру Ray        connected_nodes = 1        while connected_nodes < cluster_size:            time.sleep(1)            resources = ray.available_resources().keys()            connected_nodes = sum(1 for s in list(resources) if 'node' in s)        # вызываем последовательность обучения        train_model()        # разрушаем кластер Ray        p = subprocess.Popen("ray down", shell=True).wait()    else:        # рабочие узлы подключаются к главному узлу        head = env.instance_groups_dict['gpu']['hosts'][0]        p = subprocess.Popen(            f"ray start --address='{head}:6379'",            shell=True).wait()        # вспомогательная функция для проверки, жив ли кластер        def is_alive():            from subprocess import Popen            p = Popen('ray status', shell=True)            p.communicate()[0]            return p.returncode        # поддерживаем работу узла до завершения процесса на главном узле        while is_alive() == 0:            time.sleep(10)

Обучение на гетерогенном кластере Amazon SageMaker

После завершения нашего скрипта обучения, нам предстоит развернуть его на гетерогенный кластер Amazon SageMaker. Для этого мы следуем описанным в этом руководстве шагам. Мы начинаем с создания каталога source_dir, в который помещаем наш скрипт train.py и файл requirements.txt, содержащий два пакета pip, от которых зависит наш скрипт, timm и ray[air]. Они автоматически устанавливаются на каждом из узлов кластера SageMaker. Мы определяем две группы экземпляров SageMaker, первая с одним экземпляром ml.g5.xlarge (содержащим 1 GPU и 4 vCPU), а вторая с одним экземпляром ml.c5.4xlarge (содержащим 16 vCPU). Затем мы используем оценщик SageMaker PyTorch для определения и развертывания нашей задачи обучения в облаке.

from sagemaker.pytorch import PyTorchfrom sagemaker.instance_group import InstanceGroupcpu_group = InstanceGroup("cpu", "ml.c5.4xlarge", 1)gpu_group = InstanceGroup("gpu", "ml.g5.xlarge", 1)estimator = PyTorch(    entry_point='train.py',    source_dir='./source_dir',    framework_version='2.0.0',    role='<arn роли>',    py_version='py310',    job_name='гетерогенный-кластер',    instance_groups=[gpu_group, cpu_group])estimator.fit()

Результаты

В таблице ниже мы сравниваем результаты работы нашего скрипта обучения в двух разных настройках: одном ml.g5.xlarge GPU-экземпляре и гетерогенном кластере, содержащем ml.g5.xlarge экземпляр и ml.c5.4xlarge экземпляр. Мы оцениваем использование системных ресурсов с помощью Amazon CloudWatch и оцениваем стоимость обучения с использованием цен Amazon SageMaker на момент написания этого текста ($0.816 в час для экземпляра ml.c5.4xlarge и $1.408 для экземпляра ml.g5.xlarge).

Результаты сравнительной производительности (авторские)

Относительно высокое использование ЦП при низком использовании ГПУ в эксперименте с одним экземпляром указывает на узкое место в процессе предварительной обработки данных. Эти проблемы явно решаются при переходе к гетерогенному кластеру. Увеличивается не только использование ГПУ, но и скорость обучения. В целом, эффективность стоимости обучения увеличивается на 23%.

Следует подчеркнуть, что эти игрушечные эксперименты были созданы исключительно с целью демонстрации возможностей автоматического балансирования нагрузки, предоставляемых экосистемой Ray. Возможно, настройка контрольных параметров могла привести к улучшению производительности. Также вероятно, что выбор другого решения для решения узкого места ЦП (например, выбор экземпляра из семейства EC2 g5 с большим количеством ЦП) мог привести к лучшей стоимостной эффективности.

Резюме

В этой статье мы продемонстрировали, как использовать наборы данных Ray для балансировки нагрузки тяжелого процесса предварительной обработки данных на всех доступных ЦП-рабочих в кластере. Это позволяет легко решать проблемы узкого места ЦП, просто добавляя дополнительные экземпляры ЦП в среду обучения. Поддержка гетерогенного кластера Amazon SageMaker является убедительным способом запуска задания обучения с использованием Ray в облаке, поскольку она обрабатывает все аспекты управления кластером, избегая необходимости в специализированной поддержке DevOps.

Имейте в виду, что представленное здесь решение – только один из многих способов решения проблем узкого места ЦП. Лучшее решение для вас будет сильно зависеть от деталей вашего проекта.

Как обычно, не стесняйтесь обращаться с комментариями, исправлениями и вопросами.