Search code examples
pysparkpivotrddtransposeflatmap

transform distinct row values to different columns with corresponding rows using Pyspark


I'm new to Pyspark and trying to transform data

Given dataframe

Col1
A=id1a A=id2a B=id1b C=id1c B=id2b
D=id1d A=id3a B=id3b C=id2c
A=id4a C=id3c

Required:

A         B        C
id1a     id1b     id1c
id2a     id2b     id2c
id3a     id3b     id3b
id4a     null     null

I have tried pivot, but that gives first value.


Solution

  • May be I don't know the full picture, but the data format seems to be strange. If nothing can be done at the data source, then some collects, pivots and joins will be needed. Try this.

    import pyspark.sql.functions as F
    test = sqlContext.createDataFrame([('A=id1a A=id2a B=id1b C=id1c B=id2b',1),('D=id1d A=id3a B=id3b C=id2c',2),('A=id4a C=id3c',3)],schema=['col1','id'])
    tst_spl = test.withColumn("item",(F.split('col1'," ")))
    tst_xpl = tst_spl.select(F.explode("item"))
    tst_map = tst_xpl.withColumn("key",F.split('col','=')[0]).withColumn("value",F.split('col','=')[1]).drop('col')
    #%%
    tst_pivot = tst_map.groupby(F.lit(1)).pivot('key').agg(F.collect_list(('value'))).drop('1')
    #%%
    tst_arr = [tst_pivot.select(F.posexplode(coln)).withColumnRenamed('col',coln) for coln in tst_pivot.columns]  
    
    tst_fin = reduce(lambda df1,df2:df1.join(df2,on='pos',how='full'),tst_arr).orderBy('pos')
    
    tst_fin.show()
    +---+----+----+----+----+
    |pos|   A|   B|   C|   D|
    +---+----+----+----+----+
    |  0|id3a|id3b|id1c|id1d|
    |  1|id4a|id1b|id2c|null|
    |  2|id1a|id2b|id3c|null|
    |  3|id2a|null|null|null|
    +---+----+----+----+----