Как переформатировать выход Spark Python

(u'142578', (u'The-North-side-9890', (u' 12457896', 45.0))) (u'124578', (u'The-West-side-9091', (u' 14578217', 0.0))) 

Это я получил от присоединения к двум RDD на основе идентификаторов, это похоже на (key, (value_left, value_right)) , используя этот Spark Join.

поэтому я хочу иметь выход как

 The-North-side-9890,12457896,45.0 The-West-side-9091,14578217,0.0 

для этого я попробую со следующим кодом

 from pyspark import SparkContext sc = SparkContext("local", "info") file1 = sc.textFile('/home/hduser/join/part-00000').map(lambda line: line.split(',')) result = file1.map(lambda x: (x[1]+', '+x[2],float(x[3][:-3]))).reduceByKey(lambda a,b:a+b) result = result.map(lambda x:x[0]+','+str(x[1])) result = result.map(lambda x: x.lstrip('[(').rstrip(')]')).coalesce(1).saveAsTextFile("hdfs://localhost:9000/finalop") 

но давая мне следующий результат

 (u'The-North-side-9896', (u' 12457896',0.0 (u'The-East-side-9876', (u' 47125479',0.0 

поэтому я хочу очистить это, как я могу это сделать

помогите мне достичь этого.

2 Solutions collect form web for “Как переформатировать выход Spark Python”

чтобы получить от этого:

(u'142578', (u'The-North-side-9890', (u' 12457896', 45.0)))

к этому:

The-North-side-9890,12457896,45.0

вам необходимо использовать:

 result = result.map(lambda (k, (s, (n1, n2))): ','.join([s, str(int(n1)), str(float(n2))])) 

Попробуй это

 def rdd2string(t): def rdd2StringHelper(x): s = '' if isinstance(x, collections.Iterable): for elem in x: s = s+str(rdd2StringHelper(elem)) return s else: return str(x)+',' return rdd2StringHelper(t)[:-1] yourRDD.map(lambda x: rdd2string(x)).saveAsTextFile(...) 

Эта функция работает для всех типов кортежей, которые могут быть сформированы с помощью любой комбинации кортежей (tuple2, tuple3, tuple21 и т. Д.) И списков (списки списков, списков кортежей, список int и т. Д.) И выводит плоское представление как строка в формате CSV.

Он также отвечает на ваш вопрос: « Как удалить ненужный материал вроде (), [], одинарные кавычки из вывода PyPpark [дубликат]

РЕДАКТИРОВАТЬ

Не забудьте добавить эти import collections

  • Сортировка по нескольким полям в Apache Spark
  • используя foreachRDD и foreach для итерации по rdd в pyspark
  • Спарковый декартовочный продукт
  • PySpark DataFrame - динамически соединяется с несколькими столбцами
  • Переполнение стека при обработке нескольких столбцов с помощью UDF
  • Библиотека python от Geoip2 не работает в функции карты pySpark
  • (PySpark) Вложенные списки после reduceByKey
  • Как оптимизировать этот код на искру?
  • Python - лучший язык программирования в мире.