I have to first calculate the number of values in a column (include duplicates too). After that I have to assign a pool range.
Example pool ranges: (1000-3000), (3001-7000), (7001-20000), (20001-500000).
Suppose we get the count of total number of values as 4500 in column A and 8500 in column B. 4500 will be in range (3001-7000) and 8500 in (7001-20000).
There are columns C and D in which I have to return this range value. Column C contains information for column A, so column C will populate with values (3001-7000). Similarly, column D contains information for column B, so column D will populate with values (7001-2000). How can we achieve this in PySpark?
Counting the non-null values is easy in Spark (df.summary("count")
), but you will need to provide the pool ranges somehow. In the following example I provided pool ranges as a list of lists (one: 1 -> 2, another: 3 -> 10).
Inputs:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1111, 2222),
(1111, None),
(1111, None),
(None, None)],
["col1", "col2"])
ranges = [
[1, 2],
[3, 10]
]
Script:
cols = ["col1", "col2"]
df = df.replace("", None)
summary = df.agg(*[F.count(c).alias(c) for c in cols]).head().asDict()
col_rng = {}
for c, cnt in summary.items():
for r in ranges:
if r[1] >= int(cnt):
col_rng[c] = f"{r[0]}->{r[1]}"
break
df = df.withColumn("range_col1", F.lit(col_rng["col1"]))
df = df.withColumn("range_col2", F.lit(col_rng["col2"]))
df.show()
# +----+----+----------+----------+
# |col1|col2|range_col1|range_col2|
# +----+----+----------+----------+
# |1111|2222| 3->10| 1->2|
# |1111|null| 3->10| 1->2|
# |1111|null| 3->10| 1->2|
# |null|null| 3->10| 1->2|
# +----+----+----------+----------+