I need to generate the data in the form of ndjson/json lines having unique values of columns in pyspark .
I have 4 columns of different type. I need to take distinct value of columns indicator
, name
, fruits
grouped by column id
and generate a new column identifier
.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, BooleanType, StringType, ArrayType
spark = SparkSession.builder.appName("Example").getOrCreate()
# Define the schema for the DataFrame
schema = StructType([
StructField("id", IntegerType(), True),
StructField("indicator", BooleanType(), True),
StructField("name", StringType(), True),
StructField("fruits", ArrayType(StringType()), True)
])
# Sample data
data = [
(1, True, "Alice", ["apple", "banana"]),
(2, False, "Bob", ["orange", "grape"]),
(1, True, "Charlie", ["banana", "kiwi"])
]
# Create the DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame
df.show()
df.createOrReplaceTempView("table_data")
select id, collect_list(indicator) as indicator,collect_list(name) as name ,collect_list(fruits) as fruits
from table_data
group by id;
I get below results:
id indicator name fruits
----------------------------------------------------------------------------
1 [true] ["Alice","Charlie"] [["apple","banana"],["banana","kiwi"]]
2 [false] ["Bob"] [["orange","grape"]]
I need results as below in json format:
{ id : 1, identifier : [true, "Alice","Charlie","apple","banana","kiwi"] }
{ id : 2, identifier : [false, "Bob","orange","grape"] }
also tried below approach. This generates unique values per column, but I'm not be able find a way to concatenate all other columns into identifier
column as list of string
and generate json result.
aggregated_df = df.groupBy("id").agg(
F.collect_set("indicator").alias("indicator"),
F.collect_set("name").alias("name"),
F.array_distinct(F.flatten(F.collect_set("fruits"))).alias("fruits")
)
aggregated_df.show(truncate=False)
+---+---------+----------------+---------------------+
|id |indicator|name |fruits |
+---+---------+----------------+---------------------+
|1 |[true] |[Alice, Charlie]|[apple, banana, kiwi]|
|2 |[false] |[Bob] |[orange, grape] |
+---+---------+----------------+---------------------+
Your aggregation almost took you where you wanted. You just needed to concatenate everything into 1 single array. You can do this using the concat
function.
So let's use this in your aggregation:
aggregated_df = df.groupBy("id").agg(
F.concat(
F.collect_set(F.col("indicator").astype("string")),
F.collect_set("name"),
F.array_distinct(F.flatten(F.collect_set("fruits"))),
).alias("identifier"),
)
>>> aggregated_df.show(truncate=False)
+---+-------------------------------------------+
|id |identifier |
+---+-------------------------------------------+
|1 |[true, Alice, Charlie, apple, banana, kiwi]|
|2 |[false, Bob, orange, grape] |
+---+-------------------------------------------+
Now, there are 2 ways you can get what you want.
Writing this dataframe to a json file:
aggregated_df.write.mode("overwrite").json("output.json")
The output file will look like this:
{"id":1,"identifier":["true","Alice","Charlie","apple","banana","kiwi"]}
{"id":2,"identifier":["false","Bob","orange","grape"]}
If you want to do this in memory without writing to disk, you can use a combination of the to_json
and the struct
function to get what you want:
output = aggregated_df.withColumn(
"my_json_col", F.to_json(F.struct("id", "identifier"))
)
>>> output.show(truncate=False)
+---+-------------------------------------------+------------------------------------------------------------------------+
|id |identifier |my_json_col |
+---+-------------------------------------------+------------------------------------------------------------------------+
|1 |[true, Alice, Charlie, apple, banana, kiwi]|{"id":1,"identifier":["true","Alice","Charlie","apple","banana","kiwi"]}|
|2 |[false, Bob, orange, grape] |{"id":2,"identifier":["false","Bob","orange","grape"]} |
+---+-------------------------------------------+------------------------------------------------------------------------+