Search code examples
apache-sparkpysparkbigdataaws-glue

Transforming one row into many rows using Amazon Glue


I'm trying to use Amazon Glue to turn one row into many rows. My goal is something like a SQL UNPIVOT.

I have a pipe delimited text file that is 360GB, compressed (gzip). It has over 1,620 columns. Here's the basic layout:

primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male|1|is_college_educated|1

There are over 800 of these property name/value fields. There are roughly 280 million rows. The file is in an S3 bucket. I need to get the data into Redshift, but the column limit in Redshift is 1,600.

The users want me to unpivot the data. For example:

primary_key|key|value
12345|is_male|1
12345|is_college_educated|1

I believe I can use Amazon Glue for this. But, this is my first time using Glue. I'm struggling to figure out a good way to do this. Some of the pySpark-extension Transformations look promising (perhaps, "Map" or "Relationalize"). see http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-etl-scripts-pyspark-transforms.html. So, my question is: What is a good way to do this in Glue?

Thanks.


Solution

  • AWS Glue does not have an appropriate built-in GlueTransform subclass to convert single DynamicRecord into multiple (as usual MapReduce mapper can do). You either cannot create such a transform yourself.

    But there are two ways to solve your problem.

    Option 1: Using Spark RDD API

    Let's try to perform exactly what you need: map single record to multiple ones. Because of GlueTransform limitations we will have to dive deeper and use Spark RDD API.

    RDD has special flatMap method which allows to produce multiple Row's which are then flattened. The code for your example will look something like this:

    source_data = somehow_get_the_data_into_glue_dynamic_frame()
    source_data_rdd = source_data.toDF().rdd
    unpivoted_data_rdd = source_data_rdd.flatMap(
        lambda row: (
            (
                row.id,
                getattr(row, f'{field}_name'),
                getattr(row, f'{field}_value'),
            )
            for field in properties_names
        ),
    )
    unpivoted_data = glue_ctx.create_dynamic_frame \
        .from_rdd(unpivoted_data_rdd, name='unpivoted')
    

    Option 2: Map + Relationalize + Join

    If you want to do the requested operation using only AWS Glue ETL API then here are my instructions:

    1. First map every single DynamicRecord from source into primary key and list of objects:
    mapped = Map.apply(
        source_data,
        lambda record:  # here we operate on DynamicRecords not RDD Rows
            DynamicRecord(
                primary_key=record.primary_key,
                fields=[
                    dict(
                        key=getattr(row, f'{field}_name'),
                        value=getattr(row, f'{field}_value'),
                    )
                    for field in properties_names
                ],
            )
    )
    

    Example input:

    primary_key|property1_name|property1_value|property800_name|property800_value
          12345|is_male       |              1|is_new          |                1
          67890|is_male       |              0|is_new          |                0
    

    Output:

    primary_key|fields
          12345|[{'key': 'is_male', 'value': 1}, {'key': 'is_new', 'value': 1}]
          67890|[{'key': 'is_male', 'value': 0}, {'key': 'is_new', 'value': 0}]
    
    1. Next relationalize it: every list will be converted into multiple of rows, every nested object will be unnested (Scala Glue ETL API docs have good examples and more detailed explanations than Python docs).
    relationalized_dfc = Relationalize.apply(
        mapped,
        staging_path='s3://tmp-bucket/tmp-dir/',  # choose any dir for temp files
    )
    

    The method returns DynamicFrameCollection. In case of single array field it will contain two DynamicFrame's: first with primary_key and foreign key to flattened and unnested fields dynamic frame. Output:

    # table name: roottable
    primary_key|fields
          12345|     1
          67890|     2
    
    # table name: roottable.fields
    id|index|val.key|val.value
     1|    0|is_male|        1
     1|    1|is_new |        1
     2|    0|is_male|        0
     2|    1|is_new |        0
    
    1. The last logical step is to join these two DynamicFrame's:
    joined = Join.apply(
        frame1=relationalized_dfc['roottable'],
        keys1=['fields'],
        frame2=relationalized_dfc['roottable.fields'],
        keys2=['id'],
    )
    

    Output:

    primary_key|fields|id|index|val.key|val.value
          12345|     1| 1|    0|is_male|        1
          12345|     1| 1|    1|is_new |        1
          67890|     2| 2|    0|is_male|        0
          67890|     2| 2|    1|is_new |        0
    

    Now you just have to rename and select the desired fields.