записывать искробезопасный фрейм как CSV с разделами

Я пытаюсь написать фрейм данных в искровом окне в hdfs, и я ожидаю, что, если я добавлю ноту обозначений «partitionBy», вы создадите раздел (похожий на запись в формате паркета) в виде «partition_column_name = partition_value» ( т.е. partition_date=2016-05-03 ). для этого я выполнил следующую команду:

df.write.partitionBy('partition_date').mode('overwrite').format("com.databricks.spark.csv").save('/tmp/af_organic')

но в папках разделов не было создано никаких представлений о том, что я могу сделать, чтобы искра DF автоматически создавала эти папки?

Благодаря,

2 Solutions collect form web for “записывать искробезопасный фрейм как CSV с разделами”

Spark 2.0.0+ :

Встроенный формат csv поддерживает разметку из коробки, чтобы вы могли просто использовать:

 df.write.partitionBy('partition_date').mode(mode).format("csv").save(path) 

без каких-либо дополнительных пакетов .

Искры <2.0.0 :

В этот момент (v1.4.0) spark-csv не поддерживает partitionBy (см. Databricks / spark-csv # 123 ), но вы можете настроить встроенные источники для достижения желаемого.

Вы можете попробовать два разных подхода. Предполагая, что ваши данные относительно просты (нет сложных строк и необходимости экранирования символов) и выглядит примерно так:

 df = sc.parallelize([ ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1) ]).toDF(["k", "x1", "x2", "x3"]) 

Вы можете вручную подготовить значения для записи:

 from pyspark.sql.functions import col, concat_ws key = col("k") values = concat_ws(",", *[col(x) for x in df.columns[1:]]) kvs = df.select(key, values) 

и писать с использованием text источника

 kvs.write.partitionBy("k").text("/tmp/foo") df_foo = (sqlContext.read.format("com.databricks.spark.csv") .options(inferSchema="true") .load("/tmp/foo/k=foo")) df_foo.printSchema() ## root ## |-- C0: integer (nullable = true) ## |-- C1: double (nullable = true) ## |-- C2: double (nullable = true) 

В более сложных случаях вы можете попытаться использовать правильный синтаксический анализатор CSV для предварительной обработки значений аналогичным образом, используя UDF или сопоставление через RDD, но это будет значительно дороже.

Если формат CSV не является жестким требованием, вы также можете использовать JSON-writer, который поддерживает partitionBy из коробки:

 df.write.partitionBy("k").json("/tmp/bar") 

а также открытие разделов при чтении.

Я предлагаю использовать следующее:

 df = your dataframe object df.coalesce(n).write.csv('name_of_outputfolder',header=True) 

где n – количество разделов.

Это должно делать свое дело. Дайте мне знать, как это происходит!

  • Как преобразовать RDD с столбцом SparseVector в DataFrame со столбцом в виде вектора
  • Переименовать столбцы со специальными символами в фрейме python или Pyspark
  • Разделение сложных строк данных на простые строки в Pyspark
  • Apache Spark - присвоить результат UDF нескольким столбцам данных
  • Как вычислить «пользовательский общий итог» в области данных с искроберой 1.5
  • Apache Spark Python Косинус сходство над DataFrames
  • Spark добавляет новый столбец в dataframe со значением из предыдущей строки
  • Применение UDF в GroupedData в PySpark (с действующим примером python)
  • Python - лучший язык программирования в мире.