Search code examples
amazon-web-servicesaws-glueaws-glue-data-catalog

AWS Glue job bookmark produces duplicates for csv files


We receive 1 csv file everyday in s3 bucket from our vendor at 11am. I convert this file into parquet format using Glue at 11:30am.

I've enabled job bookmark to not process already processed files. Nonetheless, I see some files are being reprocessed thus creating duplicates.

I read these questions and answers AWS Glue Bookmark produces duplicates for PARQUET and AWS Glue Job Bookmarking explanation

They gave good understanding of job bookmarking, but still do not address the issue.

AWS documentation says, it supports CSV files for bookmarking AWS documentation.

Wondering if someone help me understand what could be the problem and if possible solution as well :)

Edit:

Pasting sample code here as requested by Prabhakar.

staging_database_name = "my-glue-db"
s3_target_path = "s3://mybucket/mydata/"


"""
 'date_index': date location in the file name
 'date_only': only date column is inserted
 'date_format': format of date
 'path': sub folder name in master bucket
"""

#fouo classified files
tables_spec = {
'sample_table': {'path': 'sample_table/load_date=','pkey': 'mykey', 'orderkey':'myorderkey'}
}

spark_conf = SparkConf().setAll([
  ("spark.hadoop.fs.s3.enableServerSideEncryption", "true"),
  ("spark.hadoop.fs.s3.serverSideEncryption.kms.keyId", kms_key_id)
])
sc = SparkContext(conf=spark_conf)

glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

for table_name, spec in tables_spec.items():
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database=database_name,
                                                                table_name=table_name,
                                                                transformation_ctx='datasource0')

    resolvechoice2 = ResolveChoice.apply(frame=datasource0, choice="make_struct", transformation_ctx='resolvechoice2')

    # Create spark data frame with input_file_name column
    delta_df = resolvechoice2.toDF().withColumn('ingest_datetime', lit(str(ingest_datetime)))

    date_dyf = DynamicFrame.fromDF(delta_df, glueContext, "date_dyf")
    master_folder_path1 = os.path.join(s3_target_path, spec['path']).replace('\\', '/')

    master_folder_path=master_folder_path1+load_date
    datasink4 = glueContext.write_dynamic_frame.from_options(frame=date_dyf,
                                                            connection_type='s3',
                                                            connection_options={"path": master_folder_path},
                                                            format='parquet', transformation_ctx='datasink4')
job.commit()

Solution

  • Spoke to AWS Support engineer and she mentioned that, she is able to reproduce the issue and have raised it with Glue technical team for resolution.

    Nonetheless, I couldn't wait on them fixing the bug and have taken different approach.

    Solution:

    1. Disable Glue bookmark
    2. After Glue job converts csv file to Parquet, I move csv file to different location in S3 bucket.