Вычисление средних значений для каждого KEY в Pairwise (K, V) RDD в Spark с Python

Я хочу поделиться этим конкретным Apache Spark с решением Python, потому что документация для него довольно плохая.

Я хотел рассчитать среднее значение пар K / V (хранящихся в Pairwise RDD), KEY. Вот как выглядят образцы данных:

>>> rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.249999999999996), (u'2013-10-13', 10.693069306930692)] 

Теперь следующая последовательность кода является менее оптимальным способом сделать это, но она работает. Это то, что я делал, прежде чем я понял лучшее решение. Это не страшно, но, как вы увидите в разделе ответов, есть более сжатый, эффективный способ.

 >>> import operator >>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} >>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (ie the SUMs). >>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (ie COUNT) >>> print(rdd1.collect()) [(u'2013-10-09', 11.235365503035176), (u'2013-10-07', 23.39500642456595), ... snip ... ] 

  • Spark dataframe преобразует несколько строк в столбец
  • Какая функция в искре используется для объединения двух RDD по ключам
  • Spark - самый быстрый способ создания RDD массивов numpy
  • Ошибка при запуске Spark: Исполнитель проиграл
  • Spark ALS predAll возвращает пустое
  • Советы по правильному использованию больших широковещательных переменных?
  • Уменьшите пару «ключ-значение» в пару из списка ключей с помощью Apache Spark
  • Вернуть RDD из takeOrdered вместо списка
  • 4 Solutions collect form web for “Вычисление средних значений для каждого KEY в Pairwise (K, V) RDD в Spark с Python”

    Теперь гораздо лучший способ сделать это – использовать метод rdd.aggregateByKey (). Поскольку этот метод настолько плохо документирован в Apache Spark с документацией Python (именно поэтому я пишу это), до недавнего времени я использовал приведенную выше последовательность кода. Но опять же, он менее эффективен, поэтому не делайте этого так, как вам нужно .

    Вот как это сделать, используя метод rdd.aggregateByKey () ( рекомендуется ) …

    КЛЮЧОМ, одновременно вычислите СУММ (числитель для среднего, который мы хотим вычислить), и COUNT (знаменатель для среднего, который мы хотим вычислить).

     >>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function. >>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1])) 

    Если верно следующее значение каждой пары «a» и «b» (просто чтобы вы могли визуализировать происходящее):

      First lambda expression for Within-Partition Reduction Step:: a: is a TUPLE that holds: (runningSum, runningCount). b: is a SCALAR that holds the next Value Second lambda expression for Cross-Partition Reduction Step:: a: is a TUPLE that holds: (runningSum, runningCount). b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount). 

    Наконец, вычислить среднее значение для каждого КЛЮЧА и собрать результаты.

     >>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect() >>> print(finalResult) [(u'2013-09-09', 11.235365503035176), (u'2013-09-01', 23.39500642456595), (u'2013-09-03', 13.53240060820617), (u'2013-09-05', 13.141148418977687), ... snip ... ] 

    Надеюсь, этот вопрос и иллюстративный ответ с помощью aggregateByKey () помогут. Если да, не забудьте также поднять вопрос. Спасибо. (◠﹏◠)

    На мой взгляд, более читаемый эквивалент aggregateByKey с двумя lambdas:

     rdd1 = rdd1 \ .mapValues(lambda v: (v, 1)) \ .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) 

    Таким образом, весь средний расчет будет следующим:

     avg_by_key = rdd1 \ .mapValues(lambda v: (v, 1)) \ .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \ .mapValues(lambda v: v[0]/v[1]) \ .collectAsMap() 

    Просто добавив заметку об интуитивном и более коротком (но плохом) решении этой проблемы. Книга Sam's Teach Yourself Apache Spark за 24 часа объяснила эту проблему в последней главе.

    Используя groupByKey можно легко решить проблему следующим образом:

     rdd = sc.parallelize([ (u'2013-10-09', 10), (u'2013-10-09', 10), (u'2013-10-09', 13), (u'2013-10-10', 40), (u'2013-10-10', 45), (u'2013-10-10', 50) ]) rdd \ .groupByKey() \ .mapValues(lambda x: sum(x) / len(x)) \ .collect() 

    Вывод:

     [('2013-10-10', 45.0), ('2013-10-09', 11.0)] 

    Это интуитивно понятный и привлекательный, но не используйте его ! groupByKey не делает никакого объединения на картографах и сводит все отдельные пары значений ключа к редуктору.

    Избегайте groupByKey как можно больше. Идите с решением reduceByKey как @ pat.

    Небольшое усиление ответа призмалитики.

    Может быть случай, когда вычисление суммы может переполняться, потому что мы суммируем огромное количество значений. Вместо этого мы могли бы сохранить средние значения и продолжать вычислять среднее значение по среднему значению и уменьшать количество двух частей.

    Если у вас есть две части, имеющие среднее значение и считанные как (a1, c1) и (a2, c2), общее среднее значение: total / counts = (total1 + total2) / (count1 + counts2) = (a1 * c1 + a2 * с2) / (С1 + с2)

    Если мы отметим R = c2 / c1, его можно переписать далее как a1 / (1 + R) + a2 * R / (1 + R). Если далее обозначить Ri как 1 / (1 + R), мы можем запишем его как a1 * Ri + a2 * R * Ri

     myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10]) sumcount_rdd = myrdd.map(lambda n : (n, 1)) def avg(A, B): R = 1.0*B[1]/A[1] Ri = 1.0/(1+R); av = A[0]*Ri + B[0]*R*Ri return (av, B[1] + A[1]); 

    (av, counts) = sumcount_rdd.reduce (avg) print (av)

    Этот подход можно преобразовать для значения ключа, просто используя mapValues ​​вместо map и reduceByKey вместо сокращения.

    Это от: https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2

    Python - лучший язык программирования в мире.