Как переформатировать выход Spark Python

(u'142578', (u'The-North-side-9890', (u' 12457896', 45.0))) (u'124578', (u'The-West-side-9091', (u' 14578217', 0.0))) 

Это я получил от присоединения к двум RDD на основе идентификаторов, это похоже на (key, (value_left, value_right)) , используя этот Spark Join.

поэтому я хочу иметь выход как

 The-North-side-9890,12457896,45.0 The-West-side-9091,14578217,0.0 

для этого я попробую со следующим кодом

 from pyspark import SparkContext sc = SparkContext("local", "info") file1 = sc.textFile('/home/hduser/join/part-00000').map(lambda line: line.split(',')) result = file1.map(lambda x: (x[1]+', '+x[2],float(x[3][:-3]))).reduceByKey(lambda a,b:a+b) result = result.map(lambda x:x[0]+','+str(x[1])) result = result.map(lambda x: x.lstrip('[(').rstrip(')]')).coalesce(1).saveAsTextFile("hdfs://localhost:9000/finalop") 

но давая мне следующий результат

 (u'The-North-side-9896', (u' 12457896',0.0 (u'The-East-side-9876', (u' 47125479',0.0 

поэтому я хочу очистить это, как я могу это сделать

помогите мне достичь этого.

2 Solutions collect form web for “Как переформатировать выход Spark Python”

чтобы получить от этого:

(u'142578', (u'The-North-side-9890', (u' 12457896', 45.0)))

к этому:

The-North-side-9890,12457896,45.0

вам необходимо использовать:

 result = result.map(lambda (k, (s, (n1, n2))): ','.join([s, str(int(n1)), str(float(n2))])) 

Попробуй это

 def rdd2string(t): def rdd2StringHelper(x): s = '' if isinstance(x, collections.Iterable): for elem in x: s = s+str(rdd2StringHelper(elem)) return s else: return str(x)+',' return rdd2StringHelper(t)[:-1] yourRDD.map(lambda x: rdd2string(x)).saveAsTextFile(...) 

Эта функция работает для всех типов кортежей, которые могут быть сформированы с помощью любой комбинации кортежей (tuple2, tuple3, tuple21 и т. Д.) И списков (списки списков, списков кортежей, список int и т. Д.) И выводит плоское представление как строка в формате CSV.

Он также отвечает на ваш вопрос: « Как удалить ненужный материал вроде (), [], одинарные кавычки из вывода PyPpark [дубликат]

РЕДАКТИРОВАТЬ

Не забудьте добавить эти import collections

Interesting Posts

Как рассчитать корреляцию между всеми столбцами и удалить сильно коррелированные с помощью python или pandas

Как указать имя схемы во время запуска «syncdb» в django?

Объединение словарей из массивов numpy различной длины (избегая, если возможно, ручных петель)

Поиск хорошей ссылки на нейронные сети

Сохранение связей urllib.request с HTTP-сервером

Нет IDLE для Python 3?

Получить исходное имя файла Google

Ошибка в интерактивной оболочке django на eclipse pydev

Python, Pairwise 'distance', нужен быстрый способ сделать это

указав «пропустить NA» при вычислении среднего значения столбца в фрейме данных, создаваемом Pandas

Поиск подписи под углом между векторами

Невозможно импортировать класс из локальных модулей в python

PySide QWebView и загрузка неподдерживаемого контента

Python не может импортировать библиотеку, скомпилированную с помощью boost_python

Что такое Pythonic способ найти самый длинный общий префикс списка списков?

Python - лучший язык программирования в мире.