import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_name", table_name = "table_name", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("dateregistered", "timestamp", "dateregistered", "timestamp"), ("id", "int", "id", "int")], transformation_ctx = "applymapping1")
df = applymapping1.toDF()
repartitioned_with_new_columns_df = applymapping1.select("*")
.withColumn("date_col", to_date(from_unixtime(col("dateRegistered"))))
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.drop(col("date_col"))
#.repartition(1)
dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")
datasink = glueContext.write_dynamic_frame.from_options(
frame = dyf,
connection_type = "s3",
connection_options = {
"path": "bucket-path",
"partitionKeys": ["year", "month", "day"]
},
format = "json",
transformation_ctx = "datasink")
job.commit()
I have above script and i cant figure out why is not working, or if it is even the correct way.
Could someone please review and let me know what i am doing wrong?
The goal here is to run this job daily, and write this table partitioned as above and save it in s3 either json or parquet.
You are referring to the wrong data frame when manipulating the columns.
applymapping1.select("*")
should actually be df.select("*")