Search code examples
parquetaws-gluestring-to-datetimeaws-glue-spark

AWS Glue ETL Spark- string to timestamp


I am trying to convert my CSVs to Parquet via AWS Glue ETL Job. At the same time, I am willing to convert my datetime column (string) to timestamp format that Athena can recognize. (Athena recognizes this yyyy-MM-dd HH:mm:ss)

I skimmed and applied many suggestions but could not successful.

Could you please let me know which library I should import, and apply script for specific row? The following code is what AWS Glue suggest for converting from CSV to Parquet and seems can customizable for also my datetime conversion purpose.

Thanks in advance.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "partition_db", table_name = "test_folder", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string")], transformation_ctx = "applymapping1")


resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://commercialanalytics/future_partition/test_folder_parquet"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

Solution

  • You can create a function and call that inside the Map class.

    import pandas as pd
    
    def parse_date(df):
        dt = pd.to_datetime(df["col_name"]).dt.strftime('%Y-%m-%d %H:%M:%S.%f') # Replace col_name with the column name
        return dt
    
    applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string")], transformation_ctx = "applymapping1")
    custommapping1 = Map.apply(frame = applymapping1 , f = parse_date, transformation_ctx = "custommapping1")
    
    

    Another option is to convert into Spark Dataframes and use spark.sql(....) queries