I have to replace missing values of my df column Type as 80% of "R" and 20% of "NR" values, so 16 missing values must be replaced by “R” value and 4 by “NR”
Id_a | Country | Type |
---|---|---|
a1 | Engalnd | |
a2 | Engalnd | |
b1 | Engalnd | |
b2 | Engalnd | |
c1 | Engalnd | |
c2 | Engalnd | |
er3 | Engalnd | |
po9 | Engalnd | |
ee4 | Engalnd | |
e4 | Engalnd | |
t5 | Engalnd | |
u8 | Engalnd | |
r4 | Engalnd | |
zx1 | Engalnd | |
11d | Engalnd | |
22 | Engalnd | |
2p | Engalnd | |
3jk | Engalnd | |
56h | Engalnd | |
a78 | Engalnd |
My idea is creating a counter like this and for the first 16 rows amputate 'R' and last 4 amputate 'NR', any suggestions how to do that?
window = Window.orderBy('Id_a')
Df= Df.withColumn('Counter',row_number().over(window))
Id_a | Country | Type | Counter |
---|---|---|---|
a1 | Engalnd | 1 | |
a2 | Engalnd | 2 | |
b1 | Engalnd | 3 | |
b2 | Engalnd | 4 | |
c1 | Engalnd | 5 | |
c2 | Engalnd | 6 | |
er3 | Engalnd | 7 | |
po9 | Engalnd | 8 | |
ee4 | Engalnd | 9 | |
e4 | Engalnd | 10 | |
t5 | Engalnd | 11 | |
u8 | Engalnd | 12 | |
r4 | Engalnd | 13 | |
zx1 | Engalnd | 14 | |
11d | Engalnd | 15 | |
22 | Engalnd | 16 | |
2p | Engalnd | 17 | |
3jk | Engalnd | 18 | |
56h | Engalnd | 19 | |
a78 | Engalnd | 20 |
Not important where the R or NR values are imputed. It's important to create a method how to replicate this scenario next time, for example next time I could have to replace 70% R 30% NR, or 15% R and 85% NR, etc...
First you can create 2 dataframes, one with the empty values and the other without empty values, after that on the dataframe with empty values, you can use randomSplit function in apache spark to split it to 2 dataframes using the ration you specified, at the end you can union the 3 dataframes to get the wanted results:
spark = SparkSession.builder.appName("DateDataFrame").getOrCreate()
data = [
("a1", "Engalnd", None),
("a2", "Engalnd", None),
("b1", "Engalnd", None),
("b2", "Engalnd", None),
("c1", "Engalnd", None),
("c2", "Engalnd", None),
("er3", "Engalnd", None),
("po9", "Engalnd", None),
("ee4", "Engalnd", None),
("e4", "Engalnd", None),
("t5", "Engalnd", None),
("u8", "Engalnd", None),
("r4", "Engalnd", None),
("zx1", "Engalnd", None),
("11d", "Engalnd", None),
("22", "Engalnd", None),
("2p", "Engalnd", None),
("3jk", "Engalnd", None),
("56h", "Engalnd", None),
("a78", "Engalnd", None),
("xxx", "Engalnd", "value1"),
("zzz", "Engalnd", "value2"),
]
df = spark.createDataFrame(data, ['Id_a', 'Country', 'Type'])
missingTypeDf = df.filter(col("Type").isNull())
notMissingTypeDf = df.filter(~col("Type").isNull())
from pyspark.sql.functions import rand
fractions = [0.8, 0.2]
# Split the DataFrame
chunkDf1, chunkDf2 = missingTypeDf.randomSplit(fractions, seed = 13)
chunkDf1 = chunkDf1.withColumn("Type", lit("R"))
chunkDf2 = chunkDf2.withColumn("Type", lit("NR"))
resultDf = notMissingTypeDf.unionByName(chunkDf1).unionByName(chunkDf2)
resultDf.show(200, truncate=False)
+----+-------+------+
|Id_a|Country|Type |
+----+-------+------+
|xxx |Engalnd|value1|
|zzz |Engalnd|value2|
|a1 |Engalnd|R |
|a2 |Engalnd|R |
|b1 |Engalnd|R |
|b2 |Engalnd|R |
|c1 |Engalnd|R |
|er3 |Engalnd|R |
|po9 |Engalnd|R |
|ee4 |Engalnd|R |
|e4 |Engalnd|R |
|t5 |Engalnd|R |
|u8 |Engalnd|R |
|r4 |Engalnd|R |
|zx1 |Engalnd|R |
|22 |Engalnd|R |
|2p |Engalnd|R |
|3jk |Engalnd|R |
|c2 |Engalnd|NR |
|11d |Engalnd|NR |
|56h |Engalnd|NR |
|a78 |Engalnd|NR |
+----+-------+------+