I am new to AWS glue and I am trying to process a CSV file in S3 that has already been cataloged by a crawler, rename the column names and add some additional columns with values, then sink the output into an S3 bucket in a JSON format. I have been able to successfully add AcusitionDateTime as a new column with the current date for all records, but I'm struggling to add the glue job id the same way I added the AcusitionDateTime. I have done some research on how to do this but I'm not sure how. Is there a way to extract the glue job id, create a new column and populate it with the glue job id for all records?
This is what I have done so far
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
from datetime import datetime
import sys
from awsglue.utils import getResolvedOptions
# Adds a AcusitionDateTime column cointaining today's date to each record of
# the input data set.
def AddDateCol(r):
r["AcusitionDateTime"] = datetime.now()
return r
# Script generated for node Data Catalog table
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "test_database2",
table_name = "sourcedata_csv",
transformation_ctx = "datasource0")
# Apply the function to each record of the Dynamic DataFrame
datasource0 = Map.apply(frame = datasource0, f = AddDateCol)
# Script generated for node ApplyMapping
ApplyMapping1 = ApplyMapping.apply(
frame=datasource0,
mappings=[
("accountid", "long", "Accountid", "long"),
("accounttype", "string", "Accounttype", "string"),
("accountname", "string", "Accountname", "string"),
("nickname", "string", "Nickname", "string"),
("accounttypeid", "long", "Accounttypeid", "long"),
("acusitiondatetime", "timestamp", "AcusitionDateTime", "timestamp"),
],
transformation_ctx="ApplyMapping1"
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping1,
connection_type="s3",
format="json",
connection_options={
"path": "s3://data-lake/dataset/",
"compression": "gzip",
"partitionKeys": [],
},
transformation_ctx="S3bucket_node3",
)
job.commit()
My objective is to have a JSON object output with something like this
'{"Accountid":"1234",
"Accounttype":"30",
"Accountname":"joins",
"Nickname":"leejones",
"Accounttypeid":"324566",
"AcusitionDateTime": "12-01-2023",
"Glue_Job_Id": "225273h37dh7dh3w7"}'
You can access job run id without passing as parameter. You can access directly from args using key 'JOB_RUN_ID'. You will get some id starting with "jr_12345....". Same Id can be seen in aws console when you run the job.
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, [])
job_id = args['JOB_RUN_ID'])