I have a data source in an S3 bucket. The data source is the CSV file with one column "ID". I want to use AWS Glue to complete an ETL job. I want to extract the data from the S3 bucket, create a second column ("ID Suffix") which is the last two elements of the "ID", and then load this data file into a different S3 bucket. So if the "ID" is 1000031, I want the second column to be 31.
Here is the script that AWS Glue created for the simple task of extracting the file from one S3 bucket and putting it into another. I would like to edit it to accomplish the task above. If you can assist with this, I would greatly appreciate it. Thanks!
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "stackoverflow", table_name = "sample_data_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "stackoverflow", table_name = "sample_data_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "int", "id", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
You can achieve this using Map.apply
with an UDF defined. Refer to below input and output that I got after running the below script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://aws-glue-us-east-2/test.csv"]}, format = "csv")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "int", "id", "int")], transformation_ctx = "applymapping1")
def map_function(dynamicRecord):
sub_id = dynamicRecord["id"][-2:]
dynamicRecord["sub_id"] = sub_id
return dynamicRecord
mapping1 = Map.apply(frame = applymapping1, f = map_function, transformation_ctx = "mapping1")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = mapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-us-east-2/Sample_output"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
Once I ran this I got below output:
Input
id
1000031
1000032
1000034
1000035
1000036
1000037
1000039
1000030
Output:
sub_id,id
31,1000031
32,1000032
34,1000034
35,1000035
36,1000036
37,1000037
39,1000039
30,1000030