Search code examples
apache-sparkpysparkapache-spark-sqlmissing-dataimputation

Replace missing values with a proportion in Pyspark


I have to replace missing values of my df column Type as 80% of "R" and 20% of "NR" values, so 16 missing values must be replaced by “R” value and 4 by “NR”

Id_a Country Type
a1 Engalnd
a2 Engalnd
b1 Engalnd
b2 Engalnd
c1 Engalnd
c2 Engalnd
er3 Engalnd
po9 Engalnd
ee4 Engalnd
e4 Engalnd
t5 Engalnd
u8 Engalnd
r4 Engalnd
zx1 Engalnd
11d Engalnd
22 Engalnd
2p Engalnd
3jk Engalnd
56h Engalnd
a78 Engalnd

My idea is creating a counter like this and for the first 16 rows amputate 'R' and last 4 amputate 'NR', any suggestions how to do that?

window = Window.orderBy('Id_a')
Df= Df.withColumn('Counter',row_number().over(window))
Id_a Country Type Counter
a1 Engalnd 1
a2 Engalnd 2
b1 Engalnd 3
b2 Engalnd 4
c1 Engalnd 5
c2 Engalnd 6
er3 Engalnd 7
po9 Engalnd 8
ee4 Engalnd 9
e4 Engalnd 10
t5 Engalnd 11
u8 Engalnd 12
r4 Engalnd 13
zx1 Engalnd 14
11d Engalnd 15
22 Engalnd 16
2p Engalnd 17
3jk Engalnd 18
56h Engalnd 19
a78 Engalnd 20

Not important where the R or NR values are imputed. It's important to create a method how to replicate this scenario next time, for example next time I could have to replace 70% R 30% NR, or 15% R and 85% NR, etc...


Solution

  • First you can create 2 dataframes, one with the empty values and the other without empty values, after that on the dataframe with empty values, you can use randomSplit function in apache spark to split it to 2 dataframes using the ration you specified, at the end you can union the 3 dataframes to get the wanted results:

    spark = SparkSession.builder.appName("DateDataFrame").getOrCreate()
    data = [
        ("a1", "Engalnd", None),
        ("a2", "Engalnd", None),
        ("b1", "Engalnd", None),
        ("b2", "Engalnd", None),
        ("c1", "Engalnd", None),
        ("c2", "Engalnd", None),
        ("er3", "Engalnd", None),
        ("po9", "Engalnd", None),
        ("ee4", "Engalnd", None),
        ("e4", "Engalnd", None),
        ("t5", "Engalnd", None),
        ("u8", "Engalnd", None),
        ("r4", "Engalnd", None),
        ("zx1", "Engalnd", None),
        ("11d", "Engalnd", None),
        ("22", "Engalnd", None),
        ("2p", "Engalnd", None),
        ("3jk", "Engalnd", None),
        ("56h", "Engalnd", None),
        ("a78", "Engalnd", None),
        ("xxx", "Engalnd", "value1"),
        ("zzz", "Engalnd", "value2"),
    ]
    df = spark.createDataFrame(data, ['Id_a', 'Country', 'Type'])
    
    missingTypeDf = df.filter(col("Type").isNull())
    notMissingTypeDf = df.filter(~col("Type").isNull())
    
    from pyspark.sql.functions import rand
    fractions = [0.8, 0.2]
    
    # Split the DataFrame
    chunkDf1, chunkDf2 = missingTypeDf.randomSplit(fractions, seed = 13)
    
    chunkDf1 = chunkDf1.withColumn("Type", lit("R"))
    chunkDf2 = chunkDf2.withColumn("Type", lit("NR"))
    
    resultDf = notMissingTypeDf.unionByName(chunkDf1).unionByName(chunkDf2)
    
    resultDf.show(200, truncate=False)
    
    +----+-------+------+
    |Id_a|Country|Type  |
    +----+-------+------+
    |xxx |Engalnd|value1|
    |zzz |Engalnd|value2|
    |a1  |Engalnd|R     |
    |a2  |Engalnd|R     |
    |b1  |Engalnd|R     |
    |b2  |Engalnd|R     |
    |c1  |Engalnd|R     |
    |er3 |Engalnd|R     |
    |po9 |Engalnd|R     |
    |ee4 |Engalnd|R     |
    |e4  |Engalnd|R     |
    |t5  |Engalnd|R     |
    |u8  |Engalnd|R     |
    |r4  |Engalnd|R     |
    |zx1 |Engalnd|R     |
    |22  |Engalnd|R     |
    |2p  |Engalnd|R     |
    |3jk |Engalnd|R     |
    |c2  |Engalnd|NR    |
    |11d |Engalnd|NR    |
    |56h |Engalnd|NR    |
    |a78 |Engalnd|NR    |
    +----+-------+------+