Кодировать и собирать несколько функций в PySpark

У меня есть класс Python, который я использую для загрузки и обработки некоторых данных в Spark. Среди различных вещей, которые мне нужно сделать, я создаю список фиктивных переменных, полученных из разных столбцов в фреймворке Spark. Моя проблема в том, что я не уверен, как правильно определить функцию, определенную пользователем, чтобы выполнить то, что мне нужно.

В настоящее время у меня есть метод, который при сопоставлении с базовым фреймворком данных RDD решает половину проблемы (помните, что это метод в более data_processor классе data_processor ):

 def build_feature_arr(self,table): # this dict has keys for all the columns for which I need dummy coding categories = {'gender':['1','2'], ..} # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file if table == 'users': iter_over = self.config.dyadic_features_to_include elif table == 'activty': iter_over = self.config.user_features_to_include def _build_feature_arr(row): result = [] row = row.asDict() for col in iter_over: column_value = str(row[col]).lower() cats = categories[col] result += [1 if column_value and cat==column_value else 0 for cat in cats] return result return _build_feature_arr 

По сути, это означает, что для указанного фреймворка принимает значения категориальной переменной для указанных столбцов и возвращает список значений этих новых фиктивных переменных. Это означает следующий код:

 data = data_processor(init_args) result = data.user_data.rdd.map(self.build_feature_arr('users')) 

возвращает что-то вроде:

 In [39]: result.take(10) Out[39]: [[1, 0, 0, 0, 1, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 0, 0, 0], [1, 0, 1, 0, 0, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 1, 0, 0], [0, 1, 1, 0, 0, 0], [1, 0, 1, 1, 0, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 0, 0, 1]] 

Это именно то, что я хочу с точки зрения генерации списка фиктивных переменных, которые я хочу, но вот мой вопрос: как я могу (а) создать UDF с аналогичной функциональностью, которую я могу использовать в запросе Spark SQL (или каким-либо другим способом , Я полагаю), или (б) взять RDD в результате описанной выше карты и добавить ее в качестве нового столбца в dataframe user_data?

В любом случае, мне нужно создать новый фреймворк данных, содержащий столбцы из user_data, а также новый столбец (назовем его feature_array ), содержащий вывод функции выше (или что-то функционально эквивалентное).

One Solution collect form web for “Кодировать и собирать несколько функций в PySpark”

Ну, вы можете написать UDF, но почему бы вам? Существует уже немало инструментов, предназначенных для решения этой категории задач:

 from pyspark.sql import Row from pyspark.ml.linalg import DenseVector row = Row("gender", "foo", "bar") df = sc.parallelize([ row("0", 3.0, DenseVector([0, 2.1, 1.0])), row("1", 1.0, DenseVector([0, 1.1, 1.0])), row("1", -1.0, DenseVector([0, 3.4, 0.0])), row("0", -3.0, DenseVector([0, 4.1, 0.0])) ]).toDF() 

Прежде всего StringIndexer .

 from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df) indexed_df = indexer.transform(df) indexed_df.drop("bar").show() ## +------+----+--------------+ ## |gender| foo|gender_numeric| ## +------+----+--------------+ ## | 0| 3.0| 0.0| ## | 1| 1.0| 1.0| ## | 1|-1.0| 1.0| ## | 0|-3.0| 0.0| ## +------+----+--------------+ 

Следующий OneHotEncoder :

 from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector") encoded_df = encoder.transform(indexed_df) encoded_df.drop("bar").show() ## +------+----+--------------+-------------+ ## |gender| foo|gender_numeric|gender_vector| ## +------+----+--------------+-------------+ ## | 0| 3.0| 0.0|(1,[0],[1.0])| ## | 1| 1.0| 1.0| (1,[],[])| ## | 1|-1.0| 1.0| (1,[],[])| ## | 0|-3.0| 0.0|(1,[0],[1.0])| ## +------+----+--------------+-------------+ 

VectorAssembler :

 from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler( inputCols=["gender_vector", "bar", "foo"], outputCol="features") encoded_df_with_indexed_bar = (vector_indexer .fit(encoded_df) .transform(encoded_df)) final_df = assembler.transform(encoded_df) 

Если в bar содержатся категориальные переменные, вы можете использовать VectorIndexer для установки необходимых метаданных:

 from pyspark.ml.feature import VectorIndexer vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed") 

но это не так.

Наконец, вы можете обернуть все это с помощью конвейеров:

 from pyspark.ml import Pipeline pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler]) model = pipeline.fit(df) transformed = model.transform(df) 

Возможно, это очень надежный и чистый подход, чем писать все с нуля. Есть некоторые предостережения, особенно если вам требуется последовательное кодирование между различными наборами данных. Вы можете прочитать больше в официальной документации для StringIndexer и VectorIndexer .

Другим способом получения сопоставимого результата является RFormula который :

RFormula создает векторный столбец функций и двойной или строковый столбец метки. Подобно тому, как формулы используются в R для линейной регрессии, столбцы ввода строки будут однострочно закодированы, а числовые столбцы будут приведены к удвоению. Если столбец меток имеет строку типа, он будет сначала преобразован в double с помощью StringIndexer . Если столбец меток не существует в DataFrame, столбец меток вывода будет создан из указанной переменной ответа в формуле.

 from pyspark.ml.feature import RFormula rf = RFormula(formula="~ gender + bar + foo - 1") final_df_rf = rf.fit(df).transform(df) 

Как вы можете видеть, это гораздо более сжато, но сложнее составить не позволяет многое настраивать. Тем не менее результат для простого конвейера, подобного этому, будет идентичным:

 final_df_rf.select("features").show(4, False) ## +----------------------+ ## |features | ## +----------------------+ ## |[1.0,0.0,2.1,1.0,3.0] | ## |[0.0,0.0,1.1,1.0,1.0] | ## |(5,[2,4],[3.4,-1.0]) | ## |[1.0,0.0,4.1,0.0,-3.0]| ## +----------------------+ final_df.select("features").show(4, False) ## +----------------------+ ## |features | ## +----------------------+ ## |[1.0,0.0,2.1,1.0,3.0] | ## |[0.0,0.0,1.1,1.0,1.0] | ## |(5,[2,4],[3.4,-1.0]) | ## |[1.0,0.0,4.1,0.0,-3.0]| ## +----------------------+ 

Что касается ваших вопросов:

сделайте UDF с аналогичной функциональностью, которую я могу использовать в запросе Spark SQL (или, как я полагаю, в другом виде)

Это просто UDF, как и любой другой. Убедитесь, что вы используете поддерживаемые типы, и кроме того, все должно работать нормально.

взять RDD в результате описанной выше карты и добавить ее как новый столбец в dataframe user_data?

 from pyspark.ml.linalg import VectorUDT from pyspark.sql.types import StructType, StructField schema = StructType([StructField("features", VectorUDT(), True)]) row = Row("features") result.map(lambda x: row(DenseVector(x))).toDF(schema) 

Примечание .

Для Spark 1.x замените pyspark.ml.linalg на pyspark.mllib.linalg .

  • PySpark 1.5 Как сократить временную метку до ближайшей минуты с секунд
  • Лучший способ получить максимальное значение в столбце Dataframe Spark
  • Фильтрация столбца данных фильтра Pyspark без значения None
  • Фильтрация столбцов в PySpark
  • Подсчитайте количество записей, отличных от NaN, в каждом столбце фреймворка Spark с Pyspark
  • Составная функция PySpark
  • Вычислить режим колонки PySpark DataFrame?
  • Spark Dataframe различает столбцы с дублированным именем
  • Python - лучший язык программирования в мире.