Как использовать источник JDBC для записи и чтения данных в (Py) Spark?

Цель этого вопроса – документировать:

С небольшими изменениями эти методы должны работать с другими поддерживаемыми языками, включая Scala и R.

  • Каковы преобразования Spark, вызывающие Shuffle?
  • Развертывание списка или карты в качестве аргументов функции в Scala
  • Интерпретация эталона в C, Clojure, Python, Ruby, Scala и других
  • Каков наилучший способ использования кода Python из Scala (или Java)?
  • Какие языки программирования можно использовать на Android Dalvik?
  • Абстрактные атрибуты в Python
  • Загрузочный веб-сервер в Scala
  • Статически типизированное метапрограммирование?
  • 2 Solutions collect form web for “Как использовать источник JDBC для записи и чтения данных в (Py) Spark?”

    Запись данных

    1. Включите применимый драйвер JDBC при отправке приложения или запуске оболочки. Вы можете использовать, например, --packages :

       bin/pyspark --packages group:name:version 

      или комбинирование driver-class-path и jars

       bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR 

      Эти свойства также могут быть заданы с использованием переменной окружения PYSPARK_SUBMIT_ARGS до PYSPARK_SUBMIT_ARGS экземпляра JVM или использования conf/spark-defaults.conf для установки spark.jars.packages или spark.jars / spark.driver.extraClassPath .

    2. Выберите нужный режим. Spark JDBC writer поддерживает следующие режимы:

      • append : Добавить содержимое этого: class: DataFrame к существующим данным.
      • overwrite : перезаписать существующие данные.
      • ignore : игнорировать эту операцию, если данные уже существуют.
      • error (случай по умолчанию): выдать исключение, если данные уже существуют.

      Усовершенствования или другие мелкозернистые модификации не поддерживаются

       mode = ... 
    3. Подготовьте URI JDBC, например:

       # You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar" 
    4. (Необязательно) Создайте словарь аргументов JDBC.

       properties = { "user": "foo", "password": "bar" } 
    5. Использовать DataFrame.write.jdbc

       df.write.jdbc(url=url, table="baz", mode=mode, properties=properties) 

      для сохранения данных (подробнее см. pyspark.sql.DataFrameWriter ).

    Известные проблемы :

    • Подходящий драйвер не может быть найден, если драйвер был включен с помощью --packages ( java.sql.SQLException: No suitable driver found for jdbc: ... )

      Предполагая, что для решения этой проблемы нет несоответствия версии драйвера, вы можете добавить класс driver к properties . Например:

       properties = { ... "driver": "org.postgresql.Driver" } 
    • используя df.write.format("jdbc").options(...).save() может привести к:

      java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource не позволяет создавать таблицу как выбранную.

      Решение неизвестно.

    • в Pyspark 1.3 вы можете попробовать напрямую вызвать Java-метод:

       df._jdf.insertIntoJDBC(url, "baz", True) 

    Чтение данных

    1. Выполните шаги 1-4 от записи данных
    2. Используйте sqlContext.read.jdbc :

       sqlContext.read.jdbc(url=url, table="baz", properties=properties) 

      или sqlContext.read.format("jdbc") :

       (sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load()) 

    Известные проблемы и ошибки :

    • Подходящий драйвер не найден – см .: Запись данных
    • Spark SQL поддерживает перенаправление предикатов с источниками JDBC, хотя не все предикаты могут быть сдвинуты вниз. Он также не делегирует ограничения и агрегаты. Возможным обходным dbtable является замена аргумента dbtable / table действительным подзапросом. См. Например:
      • Работает ли искривление предиката с JDBC?
      • Более одного часа для выполнения pyspark.sql.DataFrame.take (4)
    • По умолчанию источники данных JDBC загружают данные последовательно с использованием одного потока исполнителей. Для обеспечения распределенной загрузки данных вы можете:

      • Предоставьте column разделения (должен быть IntegeType ), lowerBound , upperBound , numPartitions .
      • Предоставьте список взаимно исключающих предикатов predicates , по одному для каждого желаемого раздела.
    • В распределенном режиме (с разделительным столбцом или предикатами) каждый исполнитель работает в своей собственной транзакции. Если исходная база данных изменяется одновременно, нет никакой гарантии, что окончательный вид будет согласован.

    Где найти подходящих водителей:

    • Репозиторий Maven (для получения требуемых координат для --packages выберите желаемую версию и скопируйте данные со вкладки Gradle в compile-group:name:version формы compile-group:name:version подставляющая соответствующие поля) или Maven Central Repository :

      • PostgreSQL
      • MySQL

    Загрузите драйвер mysql-connector-java и сохраните в папке с искровым jar, обратите внимание на приведенный ниже код python, записывая данные в «acotr1», мы должны создать структуру таблицы acotr1 в базе данных mysql

    spark = SparkSession.builder.appName ("prasadad"). master ('local'). config ('spark.driver.extraClassPath', 'D: \ spark-2.1.0-bin-hadoop2.7 \ jars \ mysql- разъем-Java-5.1.41-bin.jar '). getOrCreate ()

    sc = spark.sparkContext

    из pyspark.sql import SQLContext

    sqlContext = SQLContext (sc)

    df = sqlContext.read.format ("jdbc") .options (url = "jdbc: mysql: // localhost: 3306 / sakila", driver = "com.mysql.jdbc.Driver", dbtable = "actor", пользователь = "корень", пароль = "Ramyam01"). нагрузка ()

    mysql_url = "JDBC: MySQL: // локальный: 3306 / Sakila пользователь = корень и пароль = Ramyam01"

    df.write.jdbc (mysql_url, стол = "actor1", режим = "добавить")

    Python - лучший язык программирования в мире.