Search code examples
pythonapache-sparkpyspark

Pyspark dataframe : remove cumulative pairs from pyspark dataframe


I want to remove pairs with the same id, keeping only one of them in the data frame.

Also I can't drop duplicates by 'id' as it's possible that I have multiple combination for same "id" which might not be cumulative pairs example: I tried in python as below but not sure how to it in pyspark, any help is appreciated.

m_f_1['value'] = m_f_1.apply(lambda x: str(x['value_x']) + str(x['value_y']) if x['value_x'] > x['value_y'] else str(x['value_y']) + str(x['value_x']), axis =1)

Input dataframe m_f_1 is :

  id     value.x       value.y 
 100057    38953993985    38993095846 
 100057    38993095845    38953993985  
 100057    38993095845    38993095846
 100057    38993095846    38953993985
 100011    38989281716    38996868028   
 100011    38996868028    38989281716  
 100019    38916115350    38994231881  
 100019    38994231881    38916115350 

Output should be

head(res)

  id      value.x      value.y 
 100011    38989281716 38996868028 
 100019    38916115350 38994231881  
 100031    38911588267 38993358322 
 100057    38953993985 38993095846 
 100057    38993095845 38953993985  
 100057    38993095845 38993095846

Solution

  • You can use pyspark.sql.functions to achieve it. pyspark.sql.functions.greatest and pyspark.sql.functions.least take the maximum and minimum respectively. pyspark.sql.functions.concat is used to connect strings.

    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    
    sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
    data = [(100057,38953993985,38993095846)
        , (100057,38993095845,38953993985)
        , (100057,38993095845,38993095846)
        , (100057,38993095846,38953993985)
        , (100011,38989281716,38996868028)
        , (100011,38996868028,38989281716)
        , (100019,38916115350,38994231881)
        , (100019,38994231881,38916115350)]
    m_f_1 = sqlContext.createDataFrame(data, schema=['id','value_x','value_y'])
    
    m_f_1 = m_f_1.withColumn('value', F.concat(F.greatest('value_x','value_y').cast('string')
                                               ,F.least('value_x','value_y').cast('string')))
    m_f_1 = m_f_1.dropDuplicates(subset=['value']).drop('value').sort('id')
    m_f_1.show(truncate=False)
    
    +------+-----------+-----------+
    |id    |value_x    |value_y    |
    +------+-----------+-----------+
    |100011|38989281716|38996868028|
    |100019|38916115350|38994231881|
    |100057|38993095845|38953993985|
    |100057|38953993985|38993095846|
    |100057|38993095845|38993095846|
    +------+-----------+-----------+