Подробное руководство по Распределенному параллелизму данных (DDP)

Расширенное руководство по Распределенному параллелизму данных (DDP)

Подробное руководство о том, как ускорить обучение ваших моделей с помощью распределенного параллелизма данных (DDP)

Изображение автора

Введение

Привет всем! Я Франсуа, научный сотрудник в Meta. Добро пожаловать в этот новый учебник, входящий в серию Удивительные учебники по искусственному интеллекту.

В этом учебнике мы собираемся разобраться в такой известной технике, как DDP для обучения моделей на нескольких GPU одновременно.

Во время моего обучения в инженерном университете я использовал GPU в Google Colab для обучения. Однако в корпоративной среде все по-другому. Если вы работаете в организации, которая активно вкладывается в искусственный интеллект, особенно если вы в богатой на технологии компании, у вас, скорее всего, есть множество кластеров GPU в вашем распоряжении.

Цель этого урока – оснастить вас знаниями о том, как использовать мощность нескольких GPU для быстрого и эффективного обучения. И знаете что? Это проще, чем вы думаете! Прежде чем мы начнем, я рекомендую иметь хорошие навыки работы с PyTorch, включая его основные компоненты, такие как наборы данных, загрузчики данных, оптимизаторы, CUDA и цикл обучения.

Изначально я считал DDP сложным, почти недостижимым инструментом, думал, что для его установки понадобится большая команда специалистов. Однако я вас заверяю, DDP не только интуитивно понятен, но и краток, требуя всего нескольких строк кода для реализации. Давайте отправимся вместе в это просветительское путешествие!

Высокий уровень понимания DDP

Распределенный параллелизм данных (DDP) – это простая концепция, если мы ее разложим на составляющие. Представьте, у вас есть кластер с 4 GPU. С помощью DDP одна и та же модель загружается на каждый GPU, включая оптимизатор. Основное отличие заключается в том, как мы распределяем данные.

DDP, изображение из учебника PyTorch

Если вы знакомы с глубоким обучением, то наверняка помните о DataLoader – инструменте, который разбивает ваш набор данных на отдельные партии. Принято разбивать весь набор данных на эти партии и обновлять модель после вычислений каждой партии.

Если мы смотрим поближе, то DDP уточняет этот процесс, разделив каждую партию на так называемые “под-партии”. Практически каждый репликат модели обрабатывает сегмент основной партии, что приводит к вычислению отдельного градиента для каждого GPU.

В DDP мы разбиваем эту партию на под-партии с помощью инструмента, называемого DistributedSampler, как показано на следующем рисунке:

DDP, изображение из учебника PyTorch

После распределения каждой под-партии на отдельные GPU, каждый GPU вычисляет свой уникальный градиент.

DDP, изображение из учебника PyTorch
  • Теперь наступает магия DDP. Прежде чем обновить параметры модели, градиенты, рассчитанные на каждом GPU, необходимо объединить таким образом, чтобы каждый GPU имел средний градиент, вычисленный по всей партии данных.
  • Это достигается путем взятия градиентов со всех GPU и их усреднения. Например, если у вас есть 4 GPU, средний градиент для конкретного параметра модели будет суммой градиентов для этого параметра на каждом из 4 GPU, деленной на 4.
  • DDP использует бэкэнд NCCL или Gloo (NCCL оптимизирован для GPU от NVIDIA, Gloo более общий) для эффективной коммуникации и усреднения градиентов между GPU.
DDP, изображение из урока по PyTorch

Глоссарий терминов, узлов и рангов

Прежде чем погрузиться в код, важно понять используемую нами лексику. Давайте разъясним эти термины:

  • Узел: Представьте себе узел как мощную машину, оснащенную несколькими GPU. Когда мы говорим о кластере, это не просто скопление бросаных вместе GPU. Вместо этого они “организованы в группы или “узлы””. Например, в одном узле может быть 8 GPU.
  • Узел-мастер: В многонодовой среде обычно один узел берет на себя управление. Этот “узел-мастер” выполняет задачи, такие как синхронизация, инициирование копий модели, управление загрузкой модели и управление записями журнала. Без узла-мастера каждый GPU независимо генерировал бы журналы, что привело бы к хаосу.
  • Локальный ранг: Термин “ранг” можно сравнить с идентификатором или позицией. Локальный ранг относится к позиции или идентификатору GPU внутри его определенного узла (или машины). Он “локальный”, потому что ограничен этой конкретной машиной.
  • Глобальный ранг: Обобщая, глобальный ранг идентифицирует GPU по всем доступным узлам. Это уникальный идентификатор независимо от машины.
  • Размер мира: В основе это количество доступных GPU по всем узлам. Просто говоря, это произведение количества узлов и количества GPU в каждом узле.

Чтобы все встало на свои места, если вы работаете только на одной машине, ситуация более простая, так как локальный ранг эквивалентен глобальному рангу.

Чтобы проиллюстрировать это изображением:

Локальный ранг, изображение из учебника
Локальный ранг, изображение из учебника

Понимание ограничений DDP:

Распределенное параллельное обучение (DDP) положительно изменило многие рабочие процессы глубокого обучения, но важно понять его границы.

Суть ограничений DDP заключается в его потреблении памяти. С DDP каждый GPU загружает реплику модели, оптимизатора и своей соответствующей партии данных. Память GPU обычно варьируется от нескольких ГБ до 80 ГБ для самых продвинутых GPU.

Для небольших моделей это не проблема. Однако, когда мы занимаемся моделями большого размера, требующими больше памяти, ограничения памяти одного GPU могут быть недостаточными.

В компьютерном зрении, хотя есть множество легких моделей, возникают сложности при увеличении размеров партии, особенно в ситуациях, связанных с трехмерными изображениями или задачами обнаружения объектов.

Возникает полностью организованное параллельное обучение (FSDP). Этот метод распространяет преимущества DDP не только по распределению данных, но и по распределению состояний модели и оптимизатора по памяти GPU. Хотя это звучит привлекательно, FSDP увеличивает взаимодействие между GPU, что может замедлить обучение.

В заключение:

  • Если ваша модель и соответствующая партия данных удобно помещаются в память одного GPU, DDP – ваш лучший выбор из-за скорости.
  • Для моделей огромного размера, требующих больше памяти, FSDP более подходящий выбор. Однако будьте готовы к его недостаткам: вы жертвуете скоростью ради памяти.

Почему вы должны предпочитать DDP перед DP?

Если вы перейдете на сайт PyTorch, там есть две опции: DP и DDP. Но я упоминаю это только чтобы вы не запутались или не запутались. Просто используйте DDP, он быстрее и не ограничивается одним узлом.

Сравнение из учебника Pytorch

Прохождение по коду:

Реализация распределенного глубокого обучения проще, чем вы можете подумать. Красота заключается в том, что вам не придется заниматься настройкой графического процессора вручную или изучать сложности распределения градиента.

Вы найдете все шаблоны и скрипты на:

GitHub – FrancoisPorcher/awesome-ai-tutorials: Лучшая коллекция учебных пособий по искусственному интеллекту, которая сделает вас…

Лучшая коллекция учебных материалов по искусственному интеллекту, которая сделает вас настоящим боссом в области науки о данных! – GitHub …

github.com

Вот разбивка шагов, которые мы будем выполнять:

  1. Инициализация процесса: это включает назначение главного узла, указание порта и настройку world_size.
  2. Настройка распределенного загрузчика данных: важным этапом является разделение каждой партии данных по доступным графическим процессорам. Мы будем убеждаться, что данные равномерно распределяются без перекрытия.
  3. Обучение/тестирование модели: в сущности, этот этап остается в значительной степени неизменным по сравнению с процессом на одном графическом процессоре.

Обучение на 1 ГПУ 1 Узел (базовый)

Сначала давайте определим ванильный код, который загружает набор данных, создает модель и обучает ее до конца на одной ГПУ. Это будет нашей отправной точкой:

import torchimport torch.nn.functional as Ffrom torch.utils.data import Dataset, DataLoaderfrom sklearn.datasets import load_winefrom sklearn.model_selection import train_test_splitfrom sklearn.preprocessing import StandardScalerimport numpy as npclass WineDataset(Dataset):    def __init__(self, data, targets):        self.data = data        self.targets = targets    def __len__(self):        return len(self.data)    def __getitem__(self, idx):        return torch.tensor(self.data[idx], dtype=torch.float), torch.tensor(self.targets[idx], dtype=torch.long)class SimpleNN(torch.nn.Module):    def __init__(self):        super(SimpleNN, self).__init__()        self.fc1 = torch.nn.Linear(13, 64)        self.fc2 = torch.nn.Linear(64, 3)    def forward(self, x):        x = F.relu(self.fc1(x))        x = self.fc2(x)        return xclass Trainer():    def __init__(self, model, train_data, optimizer, gpu_id, save_every):        self.model = model        self.train_data = train_data        self.optimizer = optimizer        self.gpu_id = gpu_id        self.save_every = save_every        self.losses = []    def _run_batch(self, source, targets):        self.optimizer.zero_grad()        output = self.model(source)        loss = F.cross_entropy(output, targets)        loss.backward()        self.optimizer.step()        return loss.item()    def _run_epoch(self, epoch):        total_loss = 0.0        num_batches = len(self.train_data)        for source, targets in self.train_data:            source = source.to(self.gpu_id)            targets = targets.to(self.gpu_id)            loss = self._run_batch(source, targets)            total_loss += loss        avg_loss = total_loss / num_batches        self.losses.append(avg_loss)        print(f"Epoch {epoch}, Loss: {avg_loss:.4f}")    def _save_checkpoint(self, epoch):        checkpoint = self.model.state_dict()        PATH = f"model_{epoch}.pt"        torch.save(checkpoint, PATH)        print(f"Epoch {epoch} | Model saved to {PATH}")    def train(self, max_epochs):        self.model.train()        for epoch in range(max_epochs):            self._run_epoch(epoch)            if epoch % self.save_every == 0:                self._save_checkpoint(epoch)def load_train_objs():    wine_data = load_wine()    X = wine_data.data    y = wine_data.target    # Normalize and split    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)    scaler = StandardScaler().fit(X_train)    X_train = scaler.transform(X_train)    X_test = scaler.transform(X_test)    train_set = WineDataset(X_train, y_train)    test_set = WineDataset(X_test, y_test)    print("Sample from dataset:")    sample_data, sample_target = train_set[0]    print(f"Data: {sample_data}")    print(f"Target: {sample_target}")    model = SimpleNN()    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)    return train_set, model, optimizerdef prepare_dataloader(dataset, batch_size):    return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=True)def main(device, total_epochs, save_every, batch_size):    dataset, model, optimizer = load_train_objs()    train_data = prepare_dataloader(dataset, batch_size)    trainer = Trainer(model, train_data, optimizer, device, save_every)    trainer.train(total_epochs)main(device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), total_epochs=100, save_every=50, batch_size=32)

Обучение на нескольких графических процессорах, 1 узел

Теперь мы собираемся использовать все графические процессоры на одном узле с помощью следующих шагов:

  1. Импортировать необходимые библиотеки для распределенного обучения.
  2. Инициализировать распределенную среду: (в особенности MASTER_ADDR и MASTER_PORT
  3. Обернуть модель с помощью DDP с использованием оболочки DistributedDataParallel.
  4. Использовать распределительный сэмплер, чтобы обеспечить разделение набора данных между графическими процессорами в распределенном режиме.
  5. Измените главную функцию для spawn нескольких процессов для обучения на нескольких графических процессорах.

Для библиотек нам понадобится это:

import torch.multiprocessing as mpfrom torch.utils.data.distributed import DistributedSamplerfrom torch.nn.parallel import DistributedDataParallel as DDPfrom torch.distributed import init_process_group, destroy_process_groupimport os

Затем нам нужно настроить каждый процесс. Например, если у нас есть 8 графических процессоров на 1 узле, мы вызовем следующие функции 8 раз, один для каждого графического процессора и с правильным local_rank:

def ddp_setup(rank, world_size):    """    Настройка распределенной среды.        Args:        rank: Ранг текущего процесса. Уникальный идентификатор для каждого процесса в распределенном обучении.        world_size: Общее количество процессов, участвующих в распределенном обучении.    """        # Адрес основного узла. Поскольку мы выполняем обучение на одном узле, он установлен на localhost.    os.environ["MASTER_ADDR"] = "localhost"        # Порт, на котором ожидается, что основной узел будет слушать коммуникации от рабочих.    os.environ["MASTER_PORT"] = "12355"        # Инициализировать группу процессов.     # 'backend' указывает используемый механизм связи, "nccl" оптимизирован для обучения на GPU.    init_process_group(backend="nccl", rank=rank, world_size=world_size)        # Установить текущее устройство CUDA для указанного устройства (определяемого по рангу).    # Это гарантирует, что каждый процесс использует отдельный графический процессор в многопроцессорной настройке с несколькими графическими процессорами.    torch.cuda.set_device(rank)

Несколько пояснений к функции:

  • MASTER_ADDR – имя хоста машины, на которой запущен основной узел (или процесс с рангом 0). В данном случае – localhost
  • MASTER_PORT – указывает порт, на котором основной узел слушает подключения от рабочих или других процессов. 12355 – это произвольный номер порта. Вы можете выбрать любой неиспользуемый номер порта, при условии, что он не используется другим сервисом на вашей системе и разрешен правилами брандмауэра.
  • torch.cuda.set_device(rank): адресует использование соответствующего графического процессора каждому процессу

Затем нам нужно немного изменить класс Trainer. Просто обернем модель функцией DDP:

class Trainer():    def __init__(self, model, train_data, optimizer, gpu_id, save_every):        self.model = model.to(gpu_id)        self.train_data = train_data        self.optimizer = optimizer        self.gpu_id = gpu_id        self.save_every = save_every        self.losses = []                # Вот эти изменения        self.model = DDP(self.model, device_ids=[gpu_id])

Остальная часть класса Trainer остается без изменений, потрясающе!

Теперь мы должны изменить dataloader, потому что, помните, мы должны разделить пакет на каждый графический процессор:

def prepare_dataloader(dataset: Dataset, batch_size: int):    return DataLoader(        dataset,        batch_size=batch_size,        pin_memory=True,        shuffle=False,        sampler=DistributedSampler(dataset)    )

Теперь мы можем изменить функцию main, которая будет вызываться для каждого процесса (таким образом, 8 раз в нашем случае):

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):    """    Основная функция обучения для настройки распределенного параллелизма данных (DDP).        Args:        rank (int): Ранг текущего процесса (0 <= rank < world_size). Каждому процессу присваивается уникальный ранг.        world_size (int): Общее количество процессов, участвующих в распределенном обучении.        save_every (int): Частота сохранения контрольных точек модели, в терминах эпох.        total_epochs (int): Общее количество эпох для обучения.        batch_size (int): Количество образцов, обрабатываемых за одну итерацию (прямой и обратный проходы).    """        # Настройка распределенной среды, включая установку адреса, порта и механизма связи основного узла.    ddp_setup(rank, world_size)        # Загрузка необходимых объектов обучения - набор данных, модели и оптимизатора.    dataset, model, optimizer = load_train_objs()        # Подготовка загрузчика данных для распределенного обучения. Он разбивает набор данных между процессами и обрабатывает перемешивание.    train_data = prepare_dataloader(dataset, batch_size)        # Инициализация экземпляра тренера с загруженной моделью, данными и другими конфигурациями.    trainer = Trainer(model, train_data, optimizer, rank, save_every)        # Обучение модели на заданное количество эпох.    trainer.train(total_epochs)        # После завершения обучения очищаем распределенную среду.    destroy_process_group()

И, наконец, при выполнении скрипта нам нужно будет запустить 8 процессов. Это делается с помощью функции mp.spawn():

if __name__ == "__main__": 
    import argparse 
    parser = argparse.ArgumentParser(description='простая задача распределенного обучения') 
    parser.add_argument('total_epochs', type=int, help='Общее количество эпох для обучения модели') 
    parser.add_argument('save_every', type=int, help='Как часто сохранять снимок') 
    parser.add_argument('--batch_size', default=32, type=int, help='Размер пакета на каждом устройстве (по умолчанию: 32)') 
    args = parser.parse_args() 
    world_size = torch.cuda.device_count() 
    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

Финальный шаг: обучение на нескольких узлах

Если вы добрались до этого момента, поздравляю! Финальным шагом будет использование всех доступных графических процессоров на разных узлах. Но если вы поняли, что мы уже сделали, это очень просто.

Основное отличие при масштабировании на несколько узлов – это переход от local_rank к global_rank. Это крайне важно, поскольку каждому процессу требуется уникальный идентификатор. Например, если вы работаете с двумя узлами, каждый с 8 графическими процессорами, то процессам 0 и 9 будет присвоен один и тот же local_rank.

Global_rank определяется очень простой формулой:

global_rank = node_rank * world_size_per_node + local_rank

Так что сначала мы модифицируем функцию ddp_setup:

def ddp_setup(local_rank, world_size_per_node, node_rank): 
    os.environ["MASTER_ADDR"] = "IP_главного_узла"  # <-- Замените на IP вашего главного узла 
    os.environ["MASTER_PORT"] = "12355" 
    global_rank = node_rank * world_size_per_node + local_rank 
    init_process_group(backend="nccl", rank=global_rank, world_size=world_size_per_node*torch.cuda.device_count()) 
    torch.cuda.set_device(local_rank)

И мы также должны изменить функцию main, которая теперь принимает в качестве аргумента wold_size_per_node:

def main(local_rank: int, world_size_per_node: int, save_every: int, total_epochs: int, batch_size: int, node_rank: int): 
    ddp_setup(local_rank, world_size_per_node, node_rank) 
    # ... (остаток функции main)

И, наконец, мы изменяем функцию mp.spawn() с аргументом world_size_per_node:

if __name__ == "__main__": 
    import argparse 
    parser = argparse.ArgumentParser(description='простая задача распределенного обучения') 
    parser.add_argument('total_epochs', type=int, help='Общее количество эпох для обучения модели') 
    parser.add_argument('save_every', type=int, help='Как часто сохранять снимок') 
    parser.add_argument('--batch_size', default=32, type=int, help='Размер пакета на каждом устройстве (по умолчанию: 32)') 
    parser.add_argument('--node_rank', default=0, type=int, help='Ранг узла в многомашинном обучении') 
    args = parser.parse_args() 
    world_size_per_node = torch.cuda.device_count() 
    mp.spawn(main, args=(world_size_per_node, args.save_every, args.total_epochs, args.batch_size, args.node_rank), nprocs=world_size_per_node)

Использование кластера (SLURM)

Теперь вы готовы отправить обучение на кластер. Это очень просто, просто вызовите необходимое количество узлов.

Вот образец скрипта SLURM:

#!/bin/bash#SBATCH --job-name=DDPTraining       # Название задачи#SBATCH --nodes=$1                   # Количество узлов, указанных пользователем#SBATCH --ntasks-per-node=1          # Убедитесь, что только одна задача выполняется на каждом узле#SBATCH --cpus-per-task=1            # Количество ядер процессора на задачу#SBATCH --gres=gpu:1                 # Количество графических процессоров на узел#SBATCH --time=01:00:00              # Лимит времени в часах:минутах:секундах (1 час в этом примере)#SBATCH --mem=4GB                    # Ограничение памяти на один графический процессор#SBATCH --output=training_%j.log     # Имя файла журнала вывода и ошибок (%j разворачивается в идентификатор задания)#SBATCH --partition=gpu              # Указание раздела или очередисполнения python3 ваш_файл_python_скрипта.py --total_epochs 10 --save_every 2 --batch_size 32 --node_rank $SLURM_NODEID

И теперь вы можете запустить обучение из терминала с помощью команды

sbatch train_net.sh 2  # для использования 2 узлов

Поздравляю, вы справились!

Спасибо за чтение! Перед тем как уйти:

Для более замечательных учебных пособий, проверьте мою компиляцию учебных пособий по искусственному интеллекту на Github

GitHub – FrancoisPorcher/awesome-ai-tutorials: Лучшая коллекция учебных пособий по искусственному интеллекту для того чтобы сделать вас…

Лучшая коллекция учебных пособий по искусственному интеллекту чтобы превратить вас в босса науки о данных! – GitHub …

github.com

Вы должны получать мои статьи в вашем почтовом ящике. Подпишитесь здесь.

Если вы хотите получить доступ к премиум-статьям на VoAGI, вам нужно только членство за $5 в месяц. Если вы зарегистрируетесь по моей ссылке, вы поддерживаете меня частью своей платы без дополнительных расходов.

Если вы считаете эту статью полезной и познавательной, пожалуйста, подпишитесь на меня и поставьте лайк для получения более глубокого контента! Ваша поддержка помогает мне продолжать создавать контент, который способствует нашему коллективному пониманию.

Ссылки