Search code examples
pysparkaws-glue

Pyspark with AWS Glue join 1-N relation into a JSON array


Don't know how can I join 1-N relations on AWS Glue and export a JSON file like:

{"id": 123, "name": "John Doe", "profiles": [ {"id": 1111, "channel": "twitter"}, {"id": 2222, "channel": "twitter"}, {"id": 3333, "channel": "instagram"} ]}
{"id": 345, "name": "Test", "profiles": []}

The profiles JSON array should be created using the other tables. Also I would like to add the channel column too.

The 3 tables that I have on AWS Glue data catalog are:

person_json

{"id": 123,"nanme": "John Doe"}
{"id": 345,"nanme": "Test"}

instagram_json

{"id": 3333, "person_id": 123}
{"id": 3333, "person_id": null}

twitter_json

{"id": 1111, "person_id": 123}
{"id": 2222, "person_id": 123}

This is the script I have so far:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

# catalog: database and table names
db_name = "test_database"
tbl_person = "person_json"
tbl_instagram = "instagram_json"
tbl_twitter = "twitter_json"

# Create dynamic frames from the source tables
person = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_person)
instagram = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_instagram)
twitter = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_twitter)

# Join the frames
joined_instagram = Join.apply(person, instagram, 'id', 'person_id').drop_fields(['person_id'])
joined_all = Join.apply(joined_instagram, twitter, 'id', 'person_id').drop_fields(['person_id'])

# Writing output to S3
output_s3_path = "s3://xxx/xxx/person.json"
output = joined_all.toDF().repartition(1)
output.write.mode("overwrite").json(output_s3_path)

How should the script be changed in order to achieve the desired output?

Thanks


Solution

  • from pyspark.sql.functions import collect_set, lit, struct
    ...
    instagram = instagram.toDF().withColumn( 'channel', lit('instagram') )
    instagram = instagram.withColumn( 'profile', struct('id', 'channel') )
    twitter = twitter.toDF().withColumn( 'channel', lit('twitter') )
    twitter = twitter.withColumn( 'profile', struct('id', 'channel') )
    
    profiles = instagram.union(twitter)
    profiles = profiles.groupBy('person_id').agg( collect_set('profile').alias('profiles') )
    
    joined_all = person.join(profiles, person.id == profiles.person_id, 'left_outer').drop('channel', 'person_id')
    joined_all.show(n=2, truncate=False)
    
    +---+--------+-----------------------------------------------------+
    |id |name    |profiles                                             |
    +---+--------+-----------------------------------------------------+
    |123|John Doe|[[1111, twitter], [2222, twitter], [3333, instagram]]|
    |345|Test    |null                                                 |
    +---+--------+-----------------------------------------------------+
    

    .show() doesn't show the full structure of the structs in the profiles field.

    print(joined_all.collect())
    [Row(id=123, name='John Doe', profiles=[Row(id=1111, channel='twitter'), Row(id=2222, channel='twitter'), Row(id=3333, channel='instagram')]), Row(id=345, name='Test', profiles=None)]