I'm trying to use Amazon Glue to turn one row into many rows. My goal is something like a SQL UNPIVOT.
I have a pipe delimited text file that is 360GB, compressed (gzip). It has over 1,620 columns. Here's the basic layout:
primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male|1|is_college_educated|1
There are over 800 of these property name/value fields. There are roughly 280 million rows. The file is in an S3 bucket. I need to get the data into Redshift, but the column limit in Redshift is 1,600.
The users want me to unpivot the data. For example:
primary_key|key|value
12345|is_male|1
12345|is_college_educated|1
I believe I can use Amazon Glue for this. But, this is my first time using Glue. I'm struggling to figure out a good way to do this. Some of the pySpark-extension Transformations look promising (perhaps, "Map" or "Relationalize"). see http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-etl-scripts-pyspark-transforms.html. So, my question is: What is a good way to do this in Glue?
Thanks.
AWS Glue does not have an appropriate built-in GlueTransform
subclass to convert single DynamicRecord
into multiple (as usual MapReduce mapper can do). You either cannot create such a transform yourself.
But there are two ways to solve your problem.
Let's try to perform exactly what you need: map single record to multiple ones. Because of GlueTransform
limitations we will have to dive deeper and use Spark RDD API.
RDD has special flatMap
method which allows to produce multiple Row
's which are then flattened. The code for your example will look something like this:
source_data = somehow_get_the_data_into_glue_dynamic_frame()
source_data_rdd = source_data.toDF().rdd
unpivoted_data_rdd = source_data_rdd.flatMap(
lambda row: (
(
row.id,
getattr(row, f'{field}_name'),
getattr(row, f'{field}_value'),
)
for field in properties_names
),
)
unpivoted_data = glue_ctx.create_dynamic_frame \
.from_rdd(unpivoted_data_rdd, name='unpivoted')
If you want to do the requested operation using only AWS Glue ETL API then here are my instructions:
DynamicRecord
from source into primary key and list of objects:mapped = Map.apply(
source_data,
lambda record: # here we operate on DynamicRecords not RDD Rows
DynamicRecord(
primary_key=record.primary_key,
fields=[
dict(
key=getattr(row, f'{field}_name'),
value=getattr(row, f'{field}_value'),
)
for field in properties_names
],
)
)
Example input:
primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male | 1|is_new | 1
67890|is_male | 0|is_new | 0
Output:
primary_key|fields
12345|[{'key': 'is_male', 'value': 1}, {'key': 'is_new', 'value': 1}]
67890|[{'key': 'is_male', 'value': 0}, {'key': 'is_new', 'value': 0}]
relationalized_dfc = Relationalize.apply(
mapped,
staging_path='s3://tmp-bucket/tmp-dir/', # choose any dir for temp files
)
The method returns DynamicFrameCollection
. In case of single array field it will contain two DynamicFrame
's: first with primary_key
and foreign key to flattened and unnested fields
dynamic frame.
Output:
# table name: roottable
primary_key|fields
12345| 1
67890| 2
# table name: roottable.fields
id|index|val.key|val.value
1| 0|is_male| 1
1| 1|is_new | 1
2| 0|is_male| 0
2| 1|is_new | 0
DynamicFrame
's:joined = Join.apply(
frame1=relationalized_dfc['roottable'],
keys1=['fields'],
frame2=relationalized_dfc['roottable.fields'],
keys2=['id'],
)
Output:
primary_key|fields|id|index|val.key|val.value
12345| 1| 1| 0|is_male| 1
12345| 1| 1| 1|is_new | 1
67890| 2| 2| 0|is_male| 0
67890| 2| 2| 1|is_new | 0