Как написать полученный RDD в файл csv в Spark python

У меня есть результирующие labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) . Это выводится в этом формате:

 [(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....] 

Я хочу создать CSV-файл с одним столбцом для labels (первая часть кортежа в выводе выше) и один для predictions (вторая часть вывода кортежа). Но я не знаю, как писать в CSV-файл в Spark с помощью Python.

Как создать файл CSV с указанным выше выходом?

3 Solutions collect form web for “Как написать полученный RDD в файл csv в Spark python”

Просто map строки RDD ( labelsAndPredictions ) в строках (строки CSV), затем используйте rdd.saveAsTextFile() .

 def toCSVLine(data): return ','.join(str(d) for d in data) lines = labelsAndPredictions.map(toCSVLine) lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv') 

Я знаю, что это старый пост. Но чтобы помочь кому-то найти то же самое, вот как я пишу RDD из двух столбцов в один файл CSV в PySpark 1.6.2

RDD:

 >>> rdd.take(5) [(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')] 

Теперь код:

 # First I convert the RDD to dataframe from pyspark import SparkContext df = sqlContext.createDataFrame(rdd, ['count', 'word']) 

DF:

 >>> df.show() +-----+-----------+ |count| word| +-----+-----------+ |73342| cells| |62861| cell| |61714| studies| |61377| aim| |60168| clinical| |59275| 2| |59221| 1| |58274| data| |58087|development| |56579| cancer| |50243| disease| |49817| provided| |49216| specific| |48857| health| |48536| study| |47827| project| |45573|description| |45455| applicant| |44739| program| |44522| patients| +-----+-----------+ only showing top 20 rows при >>> df.show() +-----+-----------+ |count| word| +-----+-----------+ |73342| cells| |62861| cell| |61714| studies| |61377| aim| |60168| clinical| |59275| 2| |59221| 1| |58274| data| |58087|development| |56579| cancer| |50243| disease| |49817| provided| |49216| specific| |48857| health| |48536| study| |47827| project| |45573|description| |45455| applicant| |44739| program| |44522| patients| +-----+-----------+ only showing top 20 rows 

Теперь напишите в CSV

 # Write CSV (I have HDFS storage) df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out') 

PS: Я просто начинаю учиться на пост в Stackoverflow. Поэтому я не знаю, лучший ли это. Но это сработало для меня, и я надеюсь, что это поможет кому-то!

Нехорошо присоединяться запятыми, потому что, если поля содержат запятые, они не будут правильно цитироваться, например, ','.join(['a', 'b', '1,2,3', 'c']) дает вам a,b,1,2,3,c когда вы хотите a,b,"1,2,3",c . Вместо этого вы должны использовать модуль csv Python для преобразования каждого списка в RDD в правильно отформатированную строку csv:

 # python 3 import csv, io def list_to_csv_str(x): """Given a list of strings, returns a properly-csv-formatted string.""" output = io.StringIO("") csv.writer(output).writerow(x) return output.getvalue().strip() # remove extra newline # ... do stuff with your rdd ... rdd = rdd.map(list_to_csv_str) rdd.saveAsTextFile("output_directory") 

Поскольку модуль csv записывает только в объекты файла, мы должны создать пустой «файл» с помощью io.StringIO("") и сообщить csv.writer записать в него строку csv. Затем мы используем output.getvalue() чтобы получить строку, которую мы только что записали в «файл». Чтобы этот код работал с Python 2, просто замените io на модуль StringIO.

Если вы используете API-интерфейс Spark DataFrames, вы также можете просмотреть функцию сохранения данных DataBricks , которая имеет формат csv.

  • Библиотека python от Geoip2 не работает в функции карты pySpark
  • Получить верхнюю часть n в каждой группе DataFrame в pyspark
  • Как добавить постоянный столбец в Spark DataFrame?
  • Сохранить модель Apache Spark mllib в python
  • Как преобразовать RDD с столбцом SparseVector в DataFrame со столбцом в виде вектора
  • Pyspark - получить все параметры моделей, созданных с помощью ParamGridBuilder
  • Как использовать класс Scala внутри Pyspark
  • PySpark 1.5 Как сократить временную метку до ближайшей минуты с секунд
  •  
    Interesting Posts for Van-Lav

    Обнаружение центра и угла прямоугольников в изображении с помощью Opencv

    Twisted Python + spawnProcess. Получение вывода из команды

    Pandas: создать новый столбец в df со случайными целыми числами от диапазона

    Почему str () округляет всплывающие окна?

    Эффективно умножьте плотную матрицу на разреженный вектор

    Обнаружение отложений в формате PDF

    как настроить параметры пользовательской функции ядра с помощью конвейера в scikit-learn

    Python Error TypeError: не может конкатенировать объекты 'str' и 'float'

    Ошибка тайм-аута загрузки Heroku R10

    Рекурсивное создание жестких ссылок с использованием python

    Точный размер фигуры в matplotlib с метками заголовка, оси

    savetxt Как изменить тип с float64 на int или double

    Декоратор Python как статический метод

    Добавить новый столбец на основе булевых значений в другом столбце

    Перезапись скрипта pymc для оценки параметров в динамических системах в pymc3

    Python - лучший язык программирования в мире.