Search code examples
pysparkaws-glue

Is there a way to extract the glue job id from the pyspark script


I am new to AWS glue and I am trying to process a CSV file in S3 that has already been cataloged by a crawler, rename the column names and add some additional columns with values, then sink the output into an S3 bucket in a JSON format. I have been able to successfully add AcusitionDateTime as a new column with the current date for all records, but I'm struggling to add the glue job id the same way I added the AcusitionDateTime. I have done some research on how to do this but I'm not sure how. Is there a way to extract the glue job id, create a new column and populate it with the glue job id for all records?

This is what I have done so far

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
from datetime import datetime 
import sys
from awsglue.utils import getResolvedOptions

# Adds a AcusitionDateTime column cointaining today's date to each record of 
# the input data set.

def AddDateCol(r):
    r["AcusitionDateTime"] = datetime.now()  
    return r 

# Script generated for node Data Catalog table
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "test_database2", 
    table_name = "sourcedata_csv", 
    transformation_ctx = "datasource0")

# Apply the function to each record of the Dynamic DataFrame

datasource0 = Map.apply(frame = datasource0, f = AddDateCol) 

# Script generated for node ApplyMapping
ApplyMapping1 = ApplyMapping.apply(
    frame=datasource0,
    mappings=[
        ("accountid", "long", "Accountid", "long"),
        ("accounttype", "string", "Accounttype", "string"),
        ("accountname", "string", "Accountname", "string"),
        ("nickname", "string", "Nickname", "string"),
        ("accounttypeid", "long", "Accounttypeid", "long"),
        ("acusitiondatetime", "timestamp", "AcusitionDateTime", "timestamp"),
    ],
    transformation_ctx="ApplyMapping1"
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMapping1,
    connection_type="s3",
    format="json",
    connection_options={
        "path": "s3://data-lake/dataset/",
        "compression": "gzip",
        "partitionKeys": [],
    },
    transformation_ctx="S3bucket_node3",
)
job.commit()

My objective is to have a JSON object output with something like this

'{"Accountid":"1234", 
  "Accounttype":"30",
  "Accountname":"joins",
  "Nickname":"leejones",
  "Accounttypeid":"324566",
  "AcusitionDateTime": "12-01-2023",
  "Glue_Job_Id": "225273h37dh7dh3w7"}'

Solution

  • You can access job run id without passing as parameter. You can access directly from args using key 'JOB_RUN_ID'. You will get some id starting with "jr_12345....". Same Id can be seen in aws console when you run the job.

    import sys
    from awsglue.utils import getResolvedOptions
    
    args = getResolvedOptions(sys.argv, [])
    
    job_id = args['JOB_RUN_ID'])