Search code examples
amazon-web-servicesaws-glue

AWS ETL job that creates a new column which is a substring of an existing column


I have a data source in an S3 bucket. The data source is the CSV file with one column "ID". I want to use AWS Glue to complete an ETL job. I want to extract the data from the S3 bucket, create a second column ("ID Suffix") which is the last two elements of the "ID", and then load this data file into a different S3 bucket. So if the "ID" is 1000031, I want the second column to be 31.

Here is the script that AWS Glue created for the simple task of extracting the file from one S3 bucket and putting it into another. I would like to edit it to accomplish the task above. If you can assist with this, I would greatly appreciate it. Thanks!

import sys

from awsglue.transforms import *

from awsglue.utils import getResolvedOptions

from pyspark.context import SparkContext

from awsglue.context import GlueContext

from awsglue.job import Job



## @params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])



sc = SparkContext()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'], args)

## @type: DataSource

## @args: [database = "stackoverflow", table_name = "sample_data_csv", transformation_ctx = "datasource0"]

## @return: datasource0

## @inputs: []

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "stackoverflow", table_name = "sample_data_csv", transformation_ctx = "datasource0")

## @type: ApplyMapping

## @args: [mapping = [("id", "int", "id", "int")], transformation_ctx = "applymapping1"]

## @return: applymapping1

## @inputs: [frame = datasource0]

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int")], transformation_ctx = "applymapping1")

## @type: DataSink

## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2"]

## @return: datasink2

## @inputs: [frame = applymapping1]

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2")

job.commit()

Solution

  • You can achieve this using Map.apply with an UDF defined. Refer to below input and output that I got after running the below script:

    import sys
    
    from awsglue.transforms import *
    
    from awsglue.utils import getResolvedOptions
    
    from pyspark.context import SparkContext
    
    from awsglue.context import GlueContext
    
    from awsglue.job import Job
    
    
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    
    glueContext = GlueContext(sc)
    
    spark = glueContext.spark_session
    
    job = Job(glueContext)
    
    job.init(args['JOB_NAME'], args)
    
    datasource0 = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://aws-glue-us-east-2/test.csv"]}, format = "csv")
    
    applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "int", "id", "int")], transformation_ctx = "applymapping1")
    
    def map_function(dynamicRecord):
        sub_id = dynamicRecord["id"][-2:]
        dynamicRecord["sub_id"] = sub_id
        return dynamicRecord
    
    mapping1 = Map.apply(frame = applymapping1, f = map_function, transformation_ctx = "mapping1")
    
    datasink2 = glueContext.write_dynamic_frame.from_options(frame = mapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-us-east-2/Sample_output"}, format = "csv", transformation_ctx = "datasink2")
    
    job.commit()
    

    Once I ran this I got below output:

    Input
    
    id
    1000031
    1000032
    1000034
    1000035
    1000036
    1000037
    1000039
    1000030
    
    Output:
    
    sub_id,id
    31,1000031
    32,1000032
    34,1000034
    35,1000035
    36,1000036
    37,1000037
    39,1000039
    30,1000030