Как переформатировать выход Spark Python

(u'142578', (u'The-North-side-9890', (u' 12457896', 45.0))) (u'124578', (u'The-West-side-9091', (u' 14578217', 0.0))) 

Это я получил от присоединения к двум RDD на основе идентификаторов, это похоже на (key, (value_left, value_right)) , используя этот Spark Join.

поэтому я хочу иметь выход как

 The-North-side-9890,12457896,45.0 The-West-side-9091,14578217,0.0 

для этого я попробую со следующим кодом

 from pyspark import SparkContext sc = SparkContext("local", "info") file1 = sc.textFile('/home/hduser/join/part-00000').map(lambda line: line.split(',')) result = file1.map(lambda x: (x[1]+', '+x[2],float(x[3][:-3]))).reduceByKey(lambda a,b:a+b) result = result.map(lambda x:x[0]+','+str(x[1])) result = result.map(lambda x: x.lstrip('[(').rstrip(')]')).coalesce(1).saveAsTextFile("hdfs://localhost:9000/finalop") 

но давая мне следующий результат

 (u'The-North-side-9896', (u' 12457896',0.0 (u'The-East-side-9876', (u' 47125479',0.0 

поэтому я хочу очистить это, как я могу это сделать

помогите мне достичь этого.

2 Solutions collect form web for “Как переформатировать выход Spark Python”

чтобы получить от этого:

(u'142578', (u'The-North-side-9890', (u' 12457896', 45.0)))

к этому:

The-North-side-9890,12457896,45.0

вам необходимо использовать:

 result = result.map(lambda (k, (s, (n1, n2))): ','.join([s, str(int(n1)), str(float(n2))])) 

Попробуй это

 def rdd2string(t): def rdd2StringHelper(x): s = '' if isinstance(x, collections.Iterable): for elem in x: s = s+str(rdd2StringHelper(elem)) return s else: return str(x)+',' return rdd2StringHelper(t)[:-1] yourRDD.map(lambda x: rdd2string(x)).saveAsTextFile(...) 

Эта функция работает для всех типов кортежей, которые могут быть сформированы с помощью любой комбинации кортежей (tuple2, tuple3, tuple21 и т. Д.) И списков (списки списков, списков кортежей, список int и т. Д.) И выводит плоское представление как строка в формате CSV.

Он также отвечает на ваш вопрос: « Как удалить ненужный материал вроде (), [], одинарные кавычки из вывода PyPpark [дубликат]

РЕДАКТИРОВАТЬ

Не забудьте добавить эти import collections

  • Чтение большого файла в выпуске Spark - python
  • PySpark dataframe.foreach () с пулом соединений HappyBase возвращает «TypeError: не может раскрыть объекты thread.lock»
  • Выбор значений из непустых столбцов в элементе данных PySpark DataFrame
  • Вызов функции Java / Scala из задачи
  • Как удалить RDD в PySpark с целью освобождения ресурсов?
  • Добавление файла jar в pyspark после контекста создается
  • Искры RDD для Python DataFrame
  • Список всех файлов, доступных в Spark-кластере, хранящихся на Hadoop HDFS с использованием Scala или Python?
  • Искры java.lang.VerifyError
  • Что установить `SPARK_HOME`?
  • Как запустить несколько заданий в одном Sparkcontext из отдельных потоков в PySpark?
  •  
    Interesting Posts for Van-Lav

    Операция подозрительного файла. Связанный путь … расположен за пределами базового компонента пути

    Django: Не удается разрешить ключевое слово '' в поле. Возможные варианты:

    Запуск команды оболочки из Python и захват вывода

    Почему этот скрипт Python работает на 4 раза медленнее на нескольких ядрах, чем на одном ядре

    Предотвращение класса от прямого экземпляра в Python

    Python: форматирование выходной строки, выравнивание по правому краю

    Как установить django для python 3.3

    Python: запускайте одну функцию до тех пор, пока не закончится другая функция

    Как `super` взаимодействует с атрибутом` __mro__` класса в множественном наследовании?

    Как найти все divs, класс которых начинается со строки в BeautifulSoup?

    Использование Google App Engine для загрузки документа в документы Google (python)

    Именование возвращенных столбцов в функции агрегации Pandas?

    Найти максимальное значение в списке кортежей в Python

    Pandas groupby суммарная сумма

    Python: файл перевода не найден для домена, используя пользовательскую папку locale

    Python - лучший язык программирования в мире.