«Как автоматизировать конвейеры PySpark на AWS EMR с помощью Airflow»
Автоматизация конвейеров PySpark на AWS EMR с Airflow
Оптимизация оркестрации рабочих процессов обработки больших данных

Введение
В динамичном мире инженерии данных и аналитики крайне важно создание масштабируемых и автоматизированных конвейеров.
Энтузиасты Spark, которые уже некоторое время работают с Airflow, могут задаться вопросом:
Как выполнить задачу Spark на удаленном кластере с использованием Airflow?
Как автоматизировать конвейеры Spark с помощью AWS EMR и Airflow?
В этом руководстве мы собираемся объединить эти две технологии, показав, как:
- Настроить и извлечь основные параметры из пользовательского интерфейса Airflow.
- Создать вспомогательные функции для автоматической генерации предпочитаемой команды
spark-submit
. - Использовать метод
EmrAddStepsOperator()
Airflow для создания задачи, которая отправляет и выполняет задачу PySpark в EMR. - Использовать метод
EmrStepSensor()
Airflow для отслеживания выполнения скрипта.
Исходный код, использованный в этом руководстве, доступен на GitHub.
Предварительные условия
- Учетная запись AWS с настроенным S3-ведром и кластером EMR, настроенным в том же регионе (в данном случае
eu-north-1
). Кластер EMR должен быть доступен и находиться в состоянииWAITING
. В нашем случае его название –emr-cluster-tutorial
:

- Некоторые фиктивные данные
balances
уже доступны в ведре S3 в папкеsrc/balances
. Данные могут быть сгенерированы и записаны по указанному местоположению с помощью скрипта-поставщика данных. - Необходимые файлы
JAR
должны уже быть загружены с Maven и быть доступными в ведре S3. - Docker установлен и работает на локальной машине с выделенной памятью 4-6 ГБ.
Архитектура
Целью является запись некоторых фиктивных данных в формате parquet
в ведро S3
, а затем создание DAG
, который:
- Извлекает необходимую конфигурацию из пользовательского интерфейса Airflow;
- Загружает скрипт
pyspark
в ту же папку ведраS3
;