Search code examples
validationamazon-s3etlaws-glue

How to retrieve file names of CSV files read in AWS Glue Visual ETL with a Data Catalog table and handle validation errors for individual files?


I'm using AWS Glue Visual Studio ETL to process CSV files stored in an S3 bucket. The files are registered in the Glue Data Catalog and read as a single DynamicFrame during the ETL job. I want to:

- Retrieve the file names for the source files being processed.
- Perform data validation (e.g., schema checks or custom rules) on the data.
- Localize errors to the specific file that caused the validation failure.
- Move the file with errors to a failed folder in S3 for further inspection, while continuing to process valid files.

However, since the input data is merged into a single DynamicFrame, I cannot directly associate validation errors with specific files. Questions:

- How can I extract the file names for each row or batch of data in the DynamicFrame?
- Is there a way to run validation on a per-file basis in Glue Visual ETL?
- How can I isolate a file with errors and move it to a failed folder programmatically in Glue?

Any guidance or examples on how to achieve this would be greatly appreciated!


Solution

  • def add_file_name (glueContext, dfc) -> DynamicFrameCollection:
      from awsglue.dynamicframe import DynamicFrame, DynamicFrameCollection
      from pyspark.sql.functions import input_file_name, element_at, split
    
      transformed_frames = {}
      # Iterate over the key-value pairs in the DynamicFrameCollection
      for key in dfc.keys():
          dynamic_frame = dfc.select(key)
          spark_df = dynamic_frame.toDF()
          spark_df = spark_df.withColumn("file_name", input_file_name())
          transformed_dynamic_frame = DynamicFrame.fromDF(spark_df, glueContext, f"{key}_transformed")
          transformed_frames[key] = transformed_dynamic_frame
      return DynamicFrameCollection(transformed_frames, glueContext)