Search code examples

generate list field in json format data

I need to generate the data in the form of ndjson/json lines having unique values of columns in pyspark . I have 4 columns of different type. I need to take distinct value of columns indicator, name, fruits grouped by column id and generate a new column identifier.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, BooleanType, StringType, ArrayType

spark = SparkSession.builder.appName("Example").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("indicator", BooleanType(), True),
    StructField("name", StringType(), True),
    StructField("fruits", ArrayType(StringType()), True)

# Sample data
data = [
    (1, True, "Alice", ["apple", "banana"]),
    (2, False, "Bob", ["orange", "grape"]),
    (1, True, "Charlie", ["banana", "kiwi"])

# Create the DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame


select id, collect_list(indicator) as indicator,collect_list(name) as name ,collect_list(fruits) as fruits
from table_data
group by id;

I get below results:

id  indicator   name                  fruits
1   [true]      ["Alice","Charlie"]   [["apple","banana"],["banana","kiwi"]]
2   [false]     ["Bob"]               [["orange","grape"]]

I need results as below in json format:

{ id : 1, identifier : [true, "Alice","Charlie","apple","banana","kiwi"] }
{ id : 2, identifier : [false, "Bob","orange","grape"] }

also tried below approach. This generates unique values per column, but I'm not be able find a way to concatenate all other columns into identifier column as list of string and generate json result.

aggregated_df = df.groupBy("id").agg(
|id |indicator|name            |fruits               |
|1  |[true]   |[Alice, Charlie]|[apple, banana, kiwi]|
|2  |[false]  |[Bob]           |[orange, grape]      |


  • Your aggregation almost took you where you wanted. You just needed to concatenate everything into 1 single array. You can do this using the concat function.

    So let's use this in your aggregation:

    aggregated_df = df.groupBy("id").agg(
    |id |identifier                                 |
    |1  |[true, Alice, Charlie, apple, banana, kiwi]|
    |2  |[false, Bob, orange, grape]                |

    Now, there are 2 ways you can get what you want.

    Method nr 1

    Writing this dataframe to a json file:


    The output file will look like this:


    Method nr 2

    If you want to do this in memory without writing to disk, you can use a combination of the to_json and the struct function to get what you want:

    output = aggregated_df.withColumn(
        "my_json_col", F.to_json(F.struct("id", "identifier"))
    |id |identifier                                 |my_json_col                                                             |
    |1  |[true, Alice, Charlie, apple, banana, kiwi]|{"id":1,"identifier":["true","Alice","Charlie","apple","banana","kiwi"]}|
    |2  |[false, Bob, orange, grape]                |{"id":2,"identifier":["false","Bob","orange","grape"]}                  |