Объединить два ряда в искровом свете на основе состояния в pyspark

У меня есть запись ввода в следующем формате: Формат входных данных

введите описание изображения здесь

Я хочу, чтобы данные трансопроводились в следующем формате: Формат выходных данных

введите описание изображения здесь

Я хочу объединить свои 2 строки в зависимости от типа условия.

По моим сведениям, мне нужно взять составной ключ из трех полей данных и сравнить поля типа, когда они равны.

Может кто-нибудь, пожалуйста, помогите мне с реализацией в Spark с помощью Python?

EDIT: Следующее – моя попытка использовать RDD в pyspark

record = spark.read.csv("wasb:///records.csv",header=True).rdd print("Total records: %d")%record.count() private_ip = record.map(lambda fields: fields[2]).distinct().count() private_port = record.map(lambda fields: fields[3]).distinct().count() destination_ip = record.map(lambda fields: fields[6]).distinct().count() destination_port = record.map(lambda fields: fields[7]).distinct().count() print("private_ip:%d, private_port:%d, destination ip:%d, destination_port:%d")%(private_ip,private_port,destination_ip,destination_port) types = record.map(lambda fields: ((fields[2],fields[3],fields[6],fields[7]),fields[0])).reduceByKey(lambda a,b:a+','+b) print types.first() 

И следующая моя работа до сих пор.

 ((u'100.79.195.101', u'54835', u'58.96.162.33', u'80'), u'22-02-2016 13:11:03,22-02-2016 13:13:53') 

  • Pyspark: пользовательская функция окна
  • Может только zip с RDD, у которого одинаковое количество ошибок раздела
  • Сортировка по нескольким полям в Apache Spark
  • Файл Pyspark import .py не работает
  • Уменьшите пару «ключ-значение» в пару из списка ключей с помощью Apache Spark
  • Как подключить HBase и Spark с помощью Python?
  • как изменить столбец Dataframe из типа String в Double type в pyspark
  • PySpark 1.5 Как сократить временную метку до ближайшей минуты с секунд
  • 2 Solutions collect form web for “Объединить два ряда в искровом свете на основе состояния в pyspark”

    Надеюсь это поможет!
    ( Изменить примечание: обновленный код после получения обновленного требования)

     import pyspark.sql.functions as func #create RDD rdd = sc.parallelize([(22,'C','xxx','yyy','zzz'),(23,'D','xxx','yyy','zzz'),(24,'C','xxx1','yyy1','zzz1'),(25,'D','xxx1','yyy1','zzz1')]) #convert RDD to dataframe df = rdd.toDF(['Date','Type','Data1','Data2','Data3']) df.show() #group by 3 data columns to create list of date & type df1 = df.sort("Data1","Data2","Data3","Type").groupBy("Data1","Data2","Data3").agg(func.collect_list("Type"),func.collect_list("Date")).withColumnRenamed("collect_list(Type)", "Type_list").withColumnRenamed("collect_list(Date)", "Date_list") #add 2 new columns by splitting above date list based on type list's value df2 = df1.where((func.col("Type_list")[0]=='C') & (func.col("Type_list")[1]=='D')).withColumn("Start Date",df1.Date_list[0]).withColumn("End Date",df1.Date_list[1]) #select only relevant columns as an output df2.select("Data1","Data2","Data3","Start Date","End Date").show() 

    Альтернативное решение с использованием RDD: –
    ( Редактировать примечание: добавлено ниже фрагмента, так как @AnmolDave также интересуется решением RDD)

     import pyspark.sql.types as typ rdd = sc.parallelize([('xxx','yyy','zzz','C',22),('xxx','yyy','zzz','D',23),('xxx1','yyy1','zzz1','C', 24),('xxx1','yyy1','zzz1','D', 25)]) reduced = rdd.map(lambda row: ((row[0], row[1], row[2]), [(row[3], row[4])]))\ .reduceByKey(lambda x,y: x+y)\ .map(lambda row: (row[0], sorted(row[1], key=lambda text: text[0])))\ .map(lambda row: ( row[0][0], row[0][1], row[0][2], ','.join([str(e[0]) for e in row[1]]), row[1][0][1], row[1][1][1] ) )\ .filter(lambda row: row[3]=="C,D") schema_red = typ.StructType([ typ.StructField('Data1', typ.StringType(), False), typ.StructField('Data2', typ.StringType(), False), typ.StructField('Data3', typ.StringType(), False), typ.StructField('Type', typ.StringType(), False), typ.StructField('Start Date', typ.StringType(), False), typ.StructField('End Date', typ.StringType(), False) ]) df_red = sqlContext.createDataFrame(reduced, schema_red) df_red.show() 

    Вот простой пример, как вы хотите, код на scala надеется, что вы можете перейти на python.

     //create a dummy data val df = Seq((22, "C", "xxx","yyy","zzz"), (23, "C", "xxx","yyy","zzz")).toDF("Date", "Type", "Data1", "Data2", "Data3") +----+----+-----+-----+-----+ |Date|Type|Data1|Data2|Data3| +----+----+-----+-----+-----+ | 22| C| xxx| yyy| zzz| | 23| C| xxx| yyy| zzz| +----+----+-----+-----+-----+ //group by three fields and collect as list for column Date val df1 = df.groupBy("Data1", "Data2", "Data3").agg(collect_list($"Date")) +-----+-----+-----+--------+ |Data1|Data2|Data3| Date| +-----+-----+-----+--------+ | xxx| yyy| zzz|[22, 23]| +-----+-----+-----+--------+ //create new column with the given array of date df1.withColumn("Start Date", $"Date"(0)).withColumn("End Date", $"Date"(1)).show +-----+-----+-----+--------+----------+--------+ |Data1|Data2|Data3| Date|Start Date|End Date| +-----+-----+-----+--------+----------+--------+ | xxx| yyy| zzz|[22, 23]| 22| 23| +-----+-----+-----+--------+----------+--------+ 

    Надеюсь это поможет!

    Python - лучший язык программирования в мире.