Переместить столбец в строку с помощью Spark

Я пытаюсь перенести некоторые столбцы моей таблицы в строку. Я использую Python и Spark 1.5.0. Вот моя первоначальная таблица:

+-----+-----+-----+-------+ | A |col_1|col_2|col_...| +-----+-------------------+ | 1 | 0.0| 0.6| ... | | 2 | 0.6| 0.7| ... | | 3 | 0.5| 0.9| ... | | ...| ...| ...| ... | 

Я хотел бы иметь что-то вроде этого:

 +-----+--------+-----------+ | A | col_id | col_value | +-----+--------+-----------+ | 1 | col_1| 0.0| | 1 | col_2| 0.6| | ...| ...| ...| | 2 | col_1| 0.6| | 2 | col_2| 0.7| | ...| ...| ...| | 3 | col_1| 0.5| | 3 | col_2| 0.9| | ...| ...| ...| 

Кто-нибудь знает, что я могу это сделать? Спасибо за помощь.

5 Solutions collect form web for “Переместить столбец в строку с помощью Spark”

Это относительно просто сделать с базовыми функциями Spark SQL.

питон

 from pyspark.sql.functions import array, col, explode, struct, lit df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"]) def to_long(df, by): # Filter dtypes and split into column names and type description cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by)) # Spark SQL supports only homogeneous columns assert len(set(dtypes)) == 1, "All columns have to be of the same type" # Create and explode an array of (column_name, column_value) structs kvs = explode(array([ struct(lit(c).alias("key"), col(c).alias("val")) for c in cols ])).alias("kvs") return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"]) to_long(df, ["A"]) 

Скала :

 import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.{array, col, explode, lit, struct} val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2") def toLong(df: DataFrame, by: Seq[String]): DataFrame = { val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip require(types.distinct.size == 1) val kvs = explode(array( cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _* )) val byExprs = by.map(col(_)) df .select(byExprs :+ kvs.alias("_kvs"): _*) .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*) } toLong(df, Seq("A")) 

Библиотеки локальных линейных алгебр Spark в настоящее время очень слабы: и они не включают основные операции, как указано выше.

Существует JIRA для исправления этого для Spark 2.1, но это не поможет вам сегодня .

Что-то, что следует учитывать: выполнение транспонирования, вероятно, потребует полной перетасовки данных.

Теперь вам нужно будет написать код RDD напрямую. Я написал transpose в scala – но не в python. Вот версия scala :

  def transpose(mat: DMatrix) = { val nCols = mat(0).length val matT = mat .flatten .zipWithIndex .groupBy { _._2 % nCols } .toSeq.sortBy { _._1 } .map(_._2) .map(_.map(_._1)) .toArray matT } 

Таким образом, вы можете преобразовать это в python для вашего использования. У меня нет полосы пропускания, чтобы написать / проверить, что в этот конкретный момент: дайте мне знать, если вы не смогли сделать это преобразование.

По крайней мере, следующие легко конвертируются в python .

  • zipWithIndex -> enumerate() (эквивалент python – кредит @ zero323)
  • map -> [someOperation(x) for x in ..]
  • groupBy -> itertools.groupBy()

Вот реализация для flatten которая не имеет эквивалента python:

  def flatten(L): for item in L: try: for i in flatten(item): yield i except TypeError: yield item 

Таким образом, вы должны быть в состоянии объединить их для решения.

Используйте плоскую карту. Что-то вроде ниже должно работать

 from pyspark.sql import Row def rowExpander(row): rowDict = row.asDict() valA = rowDict.pop('A') for k in rowDict: yield Row(**{'A': valA , 'colID' = k, 'colValue' : row[k]}) newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)) 

Я принял ответ Scala, что @javadba написал и создал версию Python для переноса всех столбцов в DataFrame . Это может немного отличаться от того, что спрашивал OP …

 from itertools import chain from pyspark.sql import DataFrame def _sort_transpose_tuple(tup): x, y = tup return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0] def transpose(X): """Transpose a PySpark DataFrame. Parameters ---------- X : PySpark ``DataFrame`` The ``DataFrame`` that should be tranposed. """ # validate if not isinstance(X, DataFrame): raise TypeError('X should be a DataFrame, not a %s' % type(X)) cols = X.columns n_features = len(cols) # Sorry for this unreadability... return X.rdd.flatMap( # make into an RDD lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key lambda grp_res: grp_res[0]).map( # sort by index % n_features key lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order lambda key_col: key_col[1]).toDF() # return to DF 

Например:

 >>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF() >>> X.show() +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 2| 3| | 4| 5| 6| | 7| 8| 9| +---+---+---+ >>> transpose(X).show() +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 4| 7| | 2| 5| 8| | 3| 6| 9| +---+---+---+ 

Очень удобный способ реализации:

 from pyspark.sql import Row def rowExpander(row): rowDict = row.asDict() valA = rowDict.pop('A') for k in rowDict: yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]}) newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander) 
  • Python Pandas: сводная таблица с aggfunc = счет уникальной
  • Pandas: Фильтрация строк сводной таблицы, где счетчик меньше указанного значения
  • Создайте pandas DataFrame как правильный формат: `DataError: нет числовых типов для агрегирования`
  • Совокупный столбец Python Pandas между диапазонами дат-времени
  • Есть столбец Pandas, содержащий списки, как сворачивать уникальные элементы списка в столбцы?
  • Панды: разница между точкой поворота и поворотным столом. Почему работает только pivot_table?
  • Таблицы строк Pandas Pivot
  • Как распространить столбец в кадре данных Pandas
  • Python - лучший язык программирования в мире.