Search code examples
pythonamazon-web-servicespysparkaws-glueproduction

How to productionise Python script for AWS Glue?


I'm following this tutorial video: https://www.youtube.com/watch?v=EzQArFt_On4

The example code provided in this video:

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext

glueContext = GlueContext(SparkContext.getOrCreate())
glueJob = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

glueJob.init(args['JOB_NAME'], args)

sparkSession = glueContext.spark_session

#ETL process code
def etl_process():
  ...
  return xxx

glueJob.commit()

I'm wondering if the part before the function etl_process can be used in production directly? Or do I need to wrap that part into a separate function so that I can add unit test for it? something like this:

def define_spark_session():
    sc = SparkContext.getOrCreate()
    glue_context = GlueContext(sc)
    glue_job = Job(glue_context)

    args = getResolvedOptions(sys.argv, ['JOB_NAME'])

    glue_job.init(args['JOB_NAME'], args)

    spark_session = glue_context.spark_session
    return spark_session

But it seems doesn't need a parameter...

Or should I just write unit test for etl_process function?

Or maybe I can create a separate python file with etl_process function and import it in this script?

I'm new to this, a bit confused, might someone be able to help please? Thanks.


Solution

  • As for now it is very difficult to test AWS Glue itself locally, although there are some solutions like downloading a docker image AWS provides you and run it from there (you'll probably need some tweaks but should be all right).

    I guess the easies way is to transform the DynamicFrame you get from gluelibs into a Spark DataFrame (.toDf()) and then do thinks in pure Spark (PySpark) so you'll be able to test the result.

    dataFrame = dynamic_frame.toDf()
    
    def transormation(dataframe):
       return dataframe.withColumn(...)
    
    def test_transformation()
       result = transformation(input_test_dataframe)
       assert ...