Вызов функции Java / Scala из задачи

Задний план

Мой первоначальный вопрос: почему использование функции DecisionTreeModel.predict внутри карты вызывает исключение? и связан с тем, как генерировать кортежи (оригинальная маска, предсказанная метка) на Spark с MLlib?

Когда мы используем Scala API, рекомендуемым способом получения прогнозов для RDD[LabeledPoint] с использованием DecisionTreeModel является просто отображение над RDD :

 val labelAndPreds = testData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } 

К сожалению, подобный подход в PySpark работает не так хорошо:

 labelsAndPredictions = testData.map( lambda lp: (lp.label, model.predict(lp.features)) labelsAndPredictions.first() 

Исключение: похоже, что вы пытаетесь ссылаться на SparkContext из широковещательной переменной, действия или трансформации. SparkContext можно использовать только в драйвере, а не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063 .

Вместо этой официальной документации рекомендуется что-то вроде этого:

 predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) 

и так, что здесь происходит? Здесь нет переменной широковещания, и Scala API определяет predict следующим образом:

 /** * Predict values for a single data point using the model trained. * * @param features array representing a single data point * @return Double prediction from the trained model */ def predict(features: Vector): Double = { topNode.predict(features) } /** * Predict values for the given data set using the model trained. * * @param features RDD representing data points to be predicted * @return RDD of predictions for each of the given data points */ def predict(features: RDD[Vector]): RDD[Double] = { features.map(x => predict(x)) } 

поэтому по крайней мере на первый взгляд вызов от действия или трансформации не является проблемой, поскольку предсказание кажется локальной операцией.

объяснение

После некоторого рытья я понял, что источником проблемы является метод JavaModelWrapper.call вызываемый из DecisionTreeModel.predict . Он SparkContext доступ к SparkContext который требуется для вызова функции Java:

 callJavaFunc(self._sc, getattr(self._java_model, name), *a) 

Вопрос

В случае с DecisionTreeModel.predict существует рекомендуемое временное решение, и весь необходимый код уже является частью Scala API, но есть ли элегантный способ справиться с такой проблемой вообще?

Только решения, о которых я могу сейчас думать, – это тяжелый вес:

  • толкать все на JVM либо путем расширения классов Spark через Implicit Conversions, либо добавления каких-то оберток
  • напрямую с помощью Py4j gateway

  • Каков предпочтительный способ реализации «урожая» в Scala?
  • Статически типизированное метапрограммирование?
  • Как работает функция pyspark mapPartitions?
  • Spark: Как сопоставить Python с определенными функциями Scala или Java?
  • Spark unionВсе несколько информационных кадров
  • Эффективность искры для Scala vs Python
  • Фильтр, основанный на другом RDD в Spark
  • Каков наилучший способ использования кода Python из Scala (или Java)?
  • One Solution collect form web for “Вызов функции Java / Scala из задачи”

    Связь с использованием шлюза Py4J по умолчанию просто невозможна. Чтобы понять, почему мы должны взглянуть на следующую диаграмму из документа PySpark Internals [1]:

    введите описание изображения здесь

    Поскольку шлюз Py4J работает на драйвере, он недоступен для интерпретаторов Python, которые взаимодействуют с рабочими JVM через сокеты (см. Например, PythonRDD / rdd.py ).

    Теоретически можно было бы создать отдельный Py4J-шлюз для каждого работника, но на практике это вряд ли будет полезно. Игнорирование таких проблем, как надежность Py4J, просто не предназначено для выполнения задач, требующих интенсивного использования данных.

    Есть ли обходные пути?

    1. Использование Spark SQL Data Sources API для переноса JVM-кода.

      Плюсы : поддерживаемый, высокий уровень, не требует доступа к внутреннему API PySpark

      Минусы : Относительно подробные и не очень хорошо документированные, ограниченные в основном входными данными

    2. Работа с DataFrames с использованием Scala UDF.

      Плюсы : Легко реализовать (см. « Спарк»: как сопоставить Python с определенными функциями Scala или Java? ), Без преобразования данных между Python и Scala, если данные уже хранятся в DataFrame, минимальный доступ к Py4J

      Минусы : требуется доступ к шлюзу Py4J и внутренним методам, ограниченным Spark SQL, трудно отлаживать, не поддерживается

    3. Создание интерфейса Scala на высоком уровне аналогичным образом, как это делается в MLlib.

      Плюсы : гибкая, возможность выполнения произвольного сложного кода. Он может быть включен непосредственно в RDD (см., Например, оболочку модели MLlib ) или с помощью DataFrames (см. DataFrames Как использовать класс Scala внутри Pyspark ). Последнее решение выглядит гораздо более дружелюбным, поскольку все данные уже обрабатываются существующим API.

      Минусы : Низкий уровень, требуемое преобразование данных, то же, что и UDF, требует доступа к Py4J и внутреннему API, не поддерживается

      Некоторые основные примеры можно найти в статье «Преобразование PySpark RDD» с помощью Scala

    4. Использование внешнего инструмента управления рабочим процессом для переключения между заданиями Python и Scala / Java и передачи данных в DFS.

      Плюсы : легко реализовать, минимальные изменения самого кода

      Минусы : стоимость чтения / записи данных ( Alluxio ?)

    5. Использование общего SQLContext (см., Например, Apache Zeppelin или Livy ) для передачи данных между гостевыми языками с использованием зарегистрированных временных таблиц.

      Плюсы : хорошо подходит для интерактивного анализа

      Минусы : Не так много для пакетных заданий (Zeppelin) или может потребовать дополнительной оркестровки (Livy)


    1. Джошуа Розен. (2014, август 04) Внутренние силы PySpark . Получено с https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
    Python - лучший язык программирования в мире.