We receive 1 csv file everyday in s3 bucket from our vendor at 11am. I convert this file into parquet format using Glue at 11:30am.
I've enabled job bookmark to not process already processed files. Nonetheless, I see some files are being reprocessed thus creating duplicates.
I read these questions and answers AWS Glue Bookmark produces duplicates for PARQUET and AWS Glue Job Bookmarking explanation
They gave good understanding of job bookmarking, but still do not address the issue.
AWS documentation says, it supports CSV files for bookmarking AWS documentation.
Wondering if someone help me understand what could be the problem and if possible solution as well :)
Edit:
Pasting sample code here as requested by Prabhakar.
staging_database_name = "my-glue-db"
s3_target_path = "s3://mybucket/mydata/"
"""
'date_index': date location in the file name
'date_only': only date column is inserted
'date_format': format of date
'path': sub folder name in master bucket
"""
#fouo classified files
tables_spec = {
'sample_table': {'path': 'sample_table/load_date=','pkey': 'mykey', 'orderkey':'myorderkey'}
}
spark_conf = SparkConf().setAll([
("spark.hadoop.fs.s3.enableServerSideEncryption", "true"),
("spark.hadoop.fs.s3.serverSideEncryption.kms.keyId", kms_key_id)
])
sc = SparkContext(conf=spark_conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
for table_name, spec in tables_spec.items():
datasource0 = glueContext.create_dynamic_frame.from_catalog(database=database_name,
table_name=table_name,
transformation_ctx='datasource0')
resolvechoice2 = ResolveChoice.apply(frame=datasource0, choice="make_struct", transformation_ctx='resolvechoice2')
# Create spark data frame with input_file_name column
delta_df = resolvechoice2.toDF().withColumn('ingest_datetime', lit(str(ingest_datetime)))
date_dyf = DynamicFrame.fromDF(delta_df, glueContext, "date_dyf")
master_folder_path1 = os.path.join(s3_target_path, spec['path']).replace('\\', '/')
master_folder_path=master_folder_path1+load_date
datasink4 = glueContext.write_dynamic_frame.from_options(frame=date_dyf,
connection_type='s3',
connection_options={"path": master_folder_path},
format='parquet', transformation_ctx='datasink4')
job.commit()
Spoke to AWS Support engineer and she mentioned that, she is able to reproduce the issue and have raised it with Glue technical team for resolution.
Nonetheless, I couldn't wait on them fixing the bug and have taken different approach.
Solution: