Как запустить функцию для всех рабочих Spark перед обработкой данных в PySpark?

Я запускаю задачу Spark Streaming в кластере с использованием YARN. Каждый узел кластера выполняет несколько искровых работников. Перед началом потоковой передачи я хочу выполнить функцию «setup» для всех рабочих на всех узлах кластера.

Потоковая задача классифицирует входящие сообщения как спам или не спам, но прежде чем он сможет это сделать, ему необходимо загрузить последние предварительно подготовленные модели с HDFS на локальный диск, например, этот пример псевдокода:

def fetch_models(): if hadoop.version > local.version: hadoop.download() 

Я видел здесь следующие примеры SO:

 sc.parallelize().map(fetch_models) 

Но в Spark 1.6 parallelize() требует некоторых данных, которые нужно использовать, например, эта дерьмовая работа, которую я делаю сейчас:

 sc.parallelize(range(1, 1000)).map(fetch_models) 

Просто чтобы быть уверенным, что функция запущена для ВСЕХ работников, я задал диапазон 1000. Я также точно не знаю, сколько рабочих находится в кластере при запуске.

Я читал документацию по программированию и без искажений, но я не могу найти никакого способа фактически распределить что-либо всем работникам без каких-либо данных.

После того, как эта фаза инициализации выполнена, задача потоковой передачи, как обычно, работает с входящими данными от Kafka.

То, как я использую модели, – это запустить аналогичную функцию:

 spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS) stream.union(*create_kafka_streams())\ .repartition(spark_partitions)\ .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition))) 

Теоретически я мог проверить, обновлены ли модели в функции on_partition , хотя это было бы очень расточительно для этого в каждой партии. Я хотел бы сделать это, прежде чем Spark начнет извлекать партии из Kafka, так как загрузка с HDFS может занять пару минут …

ОБНОВИТЬ:

Чтобы быть ясным: не проблема в том, как распространять файлы или как их загружать, а о том, как запускать произвольный метод для всех работников без каких-либо данных.

Чтобы уточнить, что на самом деле загружает модели, в настоящее время:

 def on_partition(config, partition): if not MyClassifier.is_loaded(): MyClassifier.load_models(config) handle_partition(config, partition) 

Хотя MyClassifier выглядит примерно так:

 class MyClassifier: clf = None @staticmethod def is_loaded(): return MyClassifier.clf is not None @staticmethod def load_models(config): MyClassifier.clf = load_from_file(config) 

Статические методы, поскольку PySpark, похоже, не может сериализовать классы с нестационарными методами (состояние класса не имеет отношения к другому рабочему). Здесь нам нужно только один раз вызвать load_models (), а для всех будущих партий MyClassifier.clf будет установлен. Это то, что действительно не должно быть сделано для каждой партии, это одно время. То же самое с загрузкой файлов из HDFS с использованием fetch_models ().

2 Solutions collect form web for “Как запустить функцию для всех рабочих Spark перед обработкой данных в PySpark?”

Если вы хотите распространять файл между рабочими машинами, самый простой SparkFiles – использовать механизм SparkFiles :

 some_path = ... # local file, a file in DFS, an HTTP, HTTPS or FTP URI. sc.addFile(some_path) 

и получить его на рабочих, используя SparkFiles.get и стандартные инструменты ввода-вывода:

 from pyspark import SparkFiles with open(SparkFiles.get(some_path)) as fw: ... # Do something 

Если вы хотите убедиться, что на самом деле загружена модель, самым простым способом является загрузка импорта модуля. Предполагая, что config может использоваться для извлечения пути модели:

  • model.py :

     from pyspark import SparkFiles config = ... class MyClassifier: clf = None @staticmethod def is_loaded(): return MyClassifier.clf is not None @staticmethod def load_models(config): path = SparkFiles.get(config.get("model_file")) MyClassifier.clf = load_from_file(path) # Executed once per interpreter MyClassifier.load_models(config) 
  • main.py :

     from pyspark import SparkContext config = ... sc = SparkContext("local", "foo") # Executed before StreamingContext starts sc.addFile(config.get("model_file")) sc.addPyFile("model.py") import model ssc = ... stream = ... stream.map(model.MyClassifier.do_something).pprint() ssc.start() ssc.awaitTermination() 

Это типичный вариант использования широковещательных переменных Spark. Предположим, что fetch_models возвращает модели, а не сохраняет их локально, вы бы сделали что-то вроде:

 bc_models = sc.broadcast(fetch_models()) spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS) stream.union(*create_kafka_streams())\ .repartition(spark_partitions)\ .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition, bc_models.value))) 

Это предполагает, что ваши модели соответствуют памяти, драйверу и исполнителям.

Вы можете быть обеспокоены тем, что трансляция моделей из одного драйвера во все исполнители неэффективна, но использует «эффективные широковещательные алгоритмы», которые значительно превосходят распределение по HDFS в соответствии с этим анализом

  • Запись PySpark у исполнителя
  • Python Spark Как отображать поля одного rdd на другой rdd
  • как изменить столбец Dataframe из типа String в Double type в pyspark
  • Объединить два ряда в искровом свете на основе состояния в pyspark
  • перезапись искрового выхода с использованием pyspark
  • Spark Dataframe различает столбцы с дублированным именем
  • Pandas-образное преобразование сгруппированных данных на pyspark DataFrame
  • Работает ли искривление предиката с JDBC?
  • Python - лучший язык программирования в мире.