Search code examples
apache-sparkpyspark

generate list field in json format data


I need to generate the data in the form of ndjson/json lines having unique values of columns in pyspark . I have 4 columns of different type. I need to take distinct value of columns indicator, name, fruits grouped by column id and generate a new column identifier.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, BooleanType, StringType, ArrayType

spark = SparkSession.builder.appName("Example").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("indicator", BooleanType(), True),
    StructField("name", StringType(), True),
    StructField("fruits", ArrayType(StringType()), True)
])

# Sample data
data = [
    (1, True, "Alice", ["apple", "banana"]),
    (2, False, "Bob", ["orange", "grape"]),
    (1, True, "Charlie", ["banana", "kiwi"])
]

# Create the DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
df.show()

df.createOrReplaceTempView("table_data")

select id, collect_list(indicator) as indicator,collect_list(name) as name ,collect_list(fruits) as fruits
from table_data
group by id;

I get below results:

id  indicator   name                  fruits
----------------------------------------------------------------------------
1   [true]      ["Alice","Charlie"]   [["apple","banana"],["banana","kiwi"]]
2   [false]     ["Bob"]               [["orange","grape"]]

I need results as below in json format:

{ id : 1, identifier : [true, "Alice","Charlie","apple","banana","kiwi"] }
{ id : 2, identifier : [false, "Bob","orange","grape"] }

also tried below approach. This generates unique values per column, but I'm not be able find a way to concatenate all other columns into identifier column as list of string and generate json result.

aggregated_df = df.groupBy("id").agg(
    F.collect_set("indicator").alias("indicator"),
    F.collect_set("name").alias("name"),
    F.array_distinct(F.flatten(F.collect_set("fruits"))).alias("fruits")
)

aggregated_df.show(truncate=False)
+---+---------+----------------+---------------------+
|id |indicator|name            |fruits               |
+---+---------+----------------+---------------------+
|1  |[true]   |[Alice, Charlie]|[apple, banana, kiwi]|
|2  |[false]  |[Bob]           |[orange, grape]      |
+---+---------+----------------+---------------------+

Solution

  • Your aggregation almost took you where you wanted. You just needed to concatenate everything into 1 single array. You can do this using the concat function.

    So let's use this in your aggregation:

    aggregated_df = df.groupBy("id").agg(
        F.concat(
            F.collect_set(F.col("indicator").astype("string")),
            F.collect_set("name"),
            F.array_distinct(F.flatten(F.collect_set("fruits"))),
        ).alias("identifier"),
    )
    >>> aggregated_df.show(truncate=False)
    +---+-------------------------------------------+
    |id |identifier                                 |
    +---+-------------------------------------------+
    |1  |[true, Alice, Charlie, apple, banana, kiwi]|
    |2  |[false, Bob, orange, grape]                |
    +---+-------------------------------------------+
    

    Now, there are 2 ways you can get what you want.

    Method nr 1

    Writing this dataframe to a json file:

    aggregated_df.write.mode("overwrite").json("output.json")
    

    The output file will look like this:

    {"id":1,"identifier":["true","Alice","Charlie","apple","banana","kiwi"]}
    {"id":2,"identifier":["false","Bob","orange","grape"]}
    

    Method nr 2

    If you want to do this in memory without writing to disk, you can use a combination of the to_json and the struct function to get what you want:

    output = aggregated_df.withColumn(
        "my_json_col", F.to_json(F.struct("id", "identifier"))
    )
    >>> output.show(truncate=False)
    +---+-------------------------------------------+------------------------------------------------------------------------+
    |id |identifier                                 |my_json_col                                                             |
    +---+-------------------------------------------+------------------------------------------------------------------------+
    |1  |[true, Alice, Charlie, apple, banana, kiwi]|{"id":1,"identifier":["true","Alice","Charlie","apple","banana","kiwi"]}|
    |2  |[false, Bob, orange, grape]                |{"id":2,"identifier":["false","Bob","orange","grape"]}                  |
    +---+-------------------------------------------+------------------------------------------------------------------------+