Параллельное выполнение Python на Spark Варианты конкурентности с Pandas

Параллельное выполнение Python на Spark варианты конкуренции с Pandas

Получите преимущества Spark при работе с Pandas

Фото от Флориана Стецияка на Unsplash

В своей предыдущей работе я провел некоторое время, работая над внутренним проектом по прогнозированию будущего использования дискового пространства для наших клиентов услуг управления данными по тысячам дисков. Каждый диск обладает своими собственными паттернами использования, что означает, что нам необходима отдельная модель машинного обучения для каждого диска, которая использует исторические данные для прогнозирования будущего использования по отдельным дискам. В то время как проведение прогнозирования и выбор правильного алгоритма для этой работы является само по себе вызовом, проведение этой работы в масштабе представляет свои собственные проблемы.

Для использования более высокоразвитой инфраструктуры можно обратиться к параллельной обработке для ускорения прогнозирования. В этом блоге сравниваются Pandas UDF и модуль ‘concurrent.futures’ – два подхода к одновременной обработке, и определяются случаи использования для каждого из них.

Вызов

Pandas – это основной пакет в Python для работы с наборами данных в области аналитики. Работая с DataFrame, мы можем профилировать данные и оценивать их качество, проводить разведочный анализ данных, создавать описательную визуализацию данных и прогнозировать будущие тренды.

Хотя это действительно замечательный инструмент, однопоточная природа Python приводит к плохой масштабируемости при работе с большими наборами данных или когда необходимо выполнить ту же аналитику для нескольких подмножеств данных.

В мире больших данных мы ожидаем более сложный подход, так как мы также хотим сохранить отличную производительность и масштабируемость. Spark, среди других языков, позволяет нам использовать распределенную обработку, чтобы обрабатывать большие и сложные структуры данных.

Прежде чем продолжить с этим конкретным примером, можно обобщить некоторые случаи использования, которые обобщают потребность в параллельной обработке данных:

  • Применение однородных преобразований к нескольким файлам данных
  • Прогнозирование будущих значений для нескольких подмножеств данных
  • Настройка гиперпараметров для модели машинного обучения и выбор наиболее эффективной конфигурации

Увеличивая требования, чтобы выполнять такие рабочие нагрузки, как предложенные выше и в нашем случае, самым прямолинейным подходом в Python и Pandas является последовательная обработка данных. Для нашего примера это означает, что мы будем запускать вышеуказанный поток для каждого диска отдельно.

Данные

В нашем примере у нас есть данные для тысяч дисков, которые показывают свободное место, записанное со временем, и мы хотим прогнозировать будущие значения свободного места для каждого из дисков.

Чтобы немного прояснить ситуацию, я предоставляю файл csv, содержащий 1000 дисков, каждый из которых имеет месяц исторических данных о свободном месте, измеренных в гигабайтах. Размер файла достаточен для того, чтобы мы смогли увидеть влияние разных подходов к предсказанию в масштабе.

Изображение автора: пример DataFrame

В случае задачи анализа временных рядов мы используем исторические данные для прогнозирования будущих трендов, и мы хотим понять, какой алгоритм машинного обучения будет наиболее подходящим для каждого диска. Инструменты, такие как AutoML, отлично подходят для этого при поиске подходящей модели для одного набора данных, но у нас здесь 1000 наборов данных – поэтому это избыточно для нашего сравнения.

В этом случае мы ограничимся двумя алгоритмами, которые мы хотим сравнить, и увидим, какая модель наиболее подходящая, для каждого диска, используя среднеквадратическую ошибку (RMSE) как метрику валидации. Более подробную информацию о RMSE можно найти здесь. Эти алгоритмы:

  • Линейная регрессия
  • Fbprophet (обучение данных более сложной линии)
  • Модель прогнозирования временных рядов Facebook.
  • Создан для более сложных предсказаний с гиперпараметрами для сезонности.

У нас есть все компоненты готовы теперь, если мы хотим прогнозировать будущее свободное пространство для одного диска. Ниже приведен набор действий:

Изображение автора: Жизненный цикл данных

Теперь мы хотим масштабировать это, выполняя этот поток для нескольких дисков, 1 000 в нашем примере.

В рамках нашего обзора мы сравним производительность вычисления значений RMSE для разных алгоритмов при разных масштабах. Для этого я создал подмножество первых 100 дисков, чтобы имитировать это.

Это должно дать некоторые интересные идеи о производительности на наборах данных разного размера, выполняя операции различной сложности.

Введение параллельной работы

Python известен своей однопоточностью и, следовательно, не использует все доступные вычислительные ресурсы в определенный момент времени.

В результате, у меня были три варианта:

  1. Реализовать цикл for для последовательного вычисления прогнозов, используя однопоточный подход.
  2. Использовать модуль futures Python для одновременного запуска нескольких процессов.
  3. Использовать пользовательские функции Pandas UDF (user-defined functions) для использования распределенных вычислений в PySpark с сохранением нашего синтаксиса Pandas и совместимых пакетов.

Я хотел провести довольно глубокое сравнение при разных условиях окружения, поэтому использовал одноузловой кластер Databricks и другой кластер Databricks с 4 рабочими узлами, чтобы использовать Spark для нашего подхода с использованием Pandas UDF.

Мы будем следовать следующему подходу для оценки пригодности моделей линейной регрессии и fbprophet для каждого диска:

  • Разделение данных на тренировочный и тестовый наборы
  • Использование тренировочного набора в качестве входных данных и прогнозирование дат тестового набора
  • Сравнение предсказанных значений с фактическими значениями в тестовом наборе для получения значения среднеквадратической ошибки (RMSE)

Мы собираемся возвращать две вещи в наших выходных данных: измененный DataFrame с прогнозами, что дает нам дополнительную возможность построения графиков и сравнения прогнозируемых и фактических значений, и DataFrame, содержащий значения RMSE для каждого диска и алгоритма.

Функции для этого выглядят следующим образом:

Мы собираемся сравнить три вышеописанных подхода. У нас есть несколько разных сценариев, поэтому мы можем заполнить таблицу для сбора результатов по следующим пунктам:

С использованием следующих комбинаций:

Метод

  • Последовательный
  • futures
  • Pandas UDFs

Алгоритм

  • Линейная регрессия
  • Fbprophet
  • Комбинированный (оба алгоритма для каждого диска) – наиболее эффективный способ сравнения.

Режим кластера

  • Одноузловой кластер
  • Стандартный кластер с 4 рабочими узлами

Количество дисков

  • 100
  • 1000

Результаты представлены в таком формате в приложении к этому блогу, если вы хотите более подробно ознакомиться.

Методы

Метод 1: Последовательный

Метод 2: concurrent.futures

Есть два варианта использования этого модуля: параллелизация операций с высоким потреблением памяти (используя ThreadPoolExecutor) или операций с высокой потребностью в процессоре (ProcessPoolExecutor). Подробное объяснение находится на следующем блоге. Поскольку мы собираемся работать с задачей, требующей большого потребления процессора, ProcessPoolExecutor подходит для достижения нашей цели.

Метод 3: Pandas UDFs

Теперь мы собираемся изменить маршрут и использовать Spark и распределенные вычисления для помощи в повышении эффективности. Поскольку мы используем Databricks, большая часть настройки Spark выполняется автоматически, но есть некоторые настройки обработки данных в целом.

Сначала импортируйте данные в PySpark DataFrame:

Мы собираемся использовать групповое отображение Pandas UDF (PandasUDFType.GROUPED_MAP), поскольку мы хотим передать DataFrame и вернуть DataFrame. Начиная с Apache Spark 3.0, нам уже не нужно явно объявлять этот декоратор!

Нам нужно разделить наши функции fbprophet, регрессии и RMSE для Pandas UDF из-за структурирования DataFrame в PySpark, но не требуется масштабное перестроение кода для достижения этого.

Затем мы можем использовать функцию applyInPandas для получения наших результатов.

Примечание: приведенные выше примеры демонстрируют процесс использования линейной регрессии для удобочитаемости. Пожалуйста, обратитесь к полной записной книжке для полного демонстрации этого.

Интерпретация результатов

Мы создали графики для разных методов и разных настроек окружения, а затем сгруппировали данные по алгоритму и количеству дисков для удобного сравнения.

Пожалуйста, обратите внимание, что таблицы результатов находятся в приложении к этому посту.

Я подвел итоги этих выводов ниже:

  • Как ожидалось, предсказание 1000 дисков по сравнению с 100 дисками (в целом) является более времязатратным процессом.
  • Последовательный подход в целом является самым медленным, неспособным эффективно использовать ресурсы.
  • Pandas UDF неэффективны при выполнении маленьких, более простых задач. Перевод данных требует больше ресурсов, параллелизация помогает компенсировать это.
  • Как последовательные, так и подходы с использованием concurrent.futures не учитывают доступную кластеризацию в Databricks — упускают дополнительные вычислительные возможности.

Заключительные соображения

Контекст играет большую роль в выборе наиболее успешного подхода, но учитывая, что Databricks и Spark часто используются для проблем Big Data, мы можем видеть пользу от использования Pandas UDF с такими большими, более сложными наборами данных, которые мы видели здесь сегодня.

Использование среды Spark для маленьких наборов данных можно выполнять с такой же эффективностью на более маленькой (и менее дорогой!) конфигурации вычислительных ресурсов, как это показано при использовании модуля concurrent.futures, поэтому имейте это в виду, разрабатывая ваше решение.

Если вы знакомы с Python и Pandas, то ни один из подходов не будет вызывать сложностей при переходе от последовательного подхода с использованием цикла for, как показано в учебниках для начинающих.

Мы не исследовали это в этом посте, так как я обнаружил несоответствия и несовместимости с текущей версией, но недавний модуль pyspark.pandas наверняка станет более распространенным в будущем, и стоит обратить на него внимание. Это API (вместе с Koalas, разработанным командой Databricks, но теперь не поддерживается) использует привычку работы с Pandas с преимуществами Spark.

Чтобы продемонстрировать эффект, который мы пытаемся достичь, мы ограничились только просмотром значений RMSE, полученных для каждого диска, а не фактическим прогнозом будущего временного ряда. Построенную здесь структуру можно применять аналогично и для этого с логикой определения пригодности метрики оценки (а также другой логикой, такой как физические ограничения диска) и прогнозирования будущих значений с использованием определенного алгоритма, если это возможно.

Как всегда, записную книжку можно найти на моем GitHub.

Приложение

Исходно опубликовано на https://blog.coeo.com, адаптировано для этой публикации.