Search code examples
aws-glue

Partition by using ID, year and month in Glue


I'm trying to perform a partition using merchant_id, year and month as you can check in datasink.

The partition process involving only merchant_id is running fine.. I already have the column in my datasource.

But I dont have the year and month. So i'm trying get the created_at, split it and add 'year' and 'month' columns in the same table. So this way I can perform the partition (merchant_id, year, month).

Anyone can help me with that? This is the code in Glue:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "recurrent", table_name = "bills", transformation_ctx = "DataSource0")


df0 = DataSource0.toDF()
dataframe1 = df0.withColumn("filedate", df0.created_at)

dynamicframe2 = DynamicFrame.fromDF(dataframe1, glueContext, "dynamicframe2")

def map_function(dynamicRecord):
    date = dynamicRecord["filedate"].split("-")[0][-8:]
    dynamicRecord["year"] = date[0:4]
    dynamicRecord["month"] = date[4:6]
    dynamicRecord["day"]= date[6:8]
    return dynamicRecord
    
mapping3 = Map.apply(frame = dynamicframe2, f = map_function, transformation_ctx = "mapping3")

Transform2 = ApplyMapping.apply(frame = mapping3, mappings = [("op", "string", "bills_op", "string"), ("timestamp", "string", "bills_timestamp", "string"), ("id", "int", "bills_id", "int"), ("subscription_id", "int", "bills_subscription_id", "int"), ("customer_id", "int", "bills_customer_id", "int"), ("amount", "decimal", "bills_amount", "decimal"), ("created_at", "timestamp", "bills_created_at", "timestamp"), ("updated_at", "timestamp", "bills_updated_at", "timestamp"), ("status", "int", "bills_status", "int"), ("payment_method_id", "int", "bills_payment_method_id", "int"), ("due_at", "timestamp", "bills_due_at", "timestamp"), ("billing_at", "timestamp", "bills_billing_at", "timestamp"), ("installments", "int", "bills_installments", "int"), ("merchant_id", "int", "bills_merchant_id", "int"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "Transform2")

DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "recurrent", table_name = "clientes_ativos_enterprise", transformation_ctx = "DataSource1")

Transform0 = ApplyMapping.apply(frame = DataSource1, mappings = [("meta_id", "int", "filter_meta_id", "int"), ("meta_value", "string", "filter_meta_value", "string"), ("merc_id", "int", "filter_merc_id", "int")], transformation_ctx = "Transform0")

Transform1 = Join.apply(frame1 = Transform0, frame2 = Transform2, keys2 = ["bills_merchant_id"], keys1 = ["filter_merc_id"], transformation_ctx = "Transform1")

DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://analytics-plataforma-datalake/transformation-zone/partition_bills/", "compression": "gzip", "partitionKeys": ["bills_merchant_id","year","month"]}, transformation_ctx = "DataSink0")
job.commit()

And this is the full message error:

 Traceback (most recent call last):
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o151.pyWriteDynamicFrame.
: org.apache.spark.sql.AnalysisException: 
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
         ;
    at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$validateSchema(DataSource.scala:733)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:523)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:535)
    at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:522)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:521)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/bills_partition_filtered.py", line 71, in <module>
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "parquet", connection_options = 
{
    "path": "s3://analytics-plataforma-datalake/transformation-zone/partition_bills/",
    "compression": "gzip",
    "partitionKeys": [
        "bills_merchant_id",
        "year",
        "month"
    ]
}
, transformation_ctx = "DataSink0")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: '\nDatasource does not support writing empty or nested empty schemas.\nPlease make sure the data schema has at least one or more column(s).\n 

Thank you all!!


Solution

    1. Error tracing says there is a problem with the dataframe schema. You shoud have a look df.printSchema before writing to understand your schema is correct.

    2. You should cast your created_at column to date/datetime.

    3. Use withColumn function and parse created_at column's year and month instead of taking the values statically which can cause inconsistency in future.

    dynamicRecord["year"] = date[0:4]

    it's not a good way to parse date.

    Follow the answers here to apply #3: Splitting Date into Year, Month and Day, with inconsistent delimiters