Search code examples
scalaunit-testingapache-sparkapache-spark-sqlparquet

Unit Testing spark dataframes transformation chaining


I am very new to scala spark ecosystem and wondering what would be the best way to unit test a chained dataframe transformation.So here is code sample of the method that I would like to test

def writeToParquet(spark: SparkSession, dataFrame: DataFrame, col1: DataType1, col2:DataType2): Unit {
    dataFrame
        .withColumn("date", some_columnar_date_logic)
        .withColumn("hour", some_more_functional_logic)
        .... //couple more transformation logic
        .write
        .mode(SaveMode.Append)
        .partitionBy("col1", "col2", "col3")
        .parquet("some hdfs/s3/url")        
} 

The problem is parquet is of Unit return type which makes testing difficult. The problem is further amplified by the fact that, transformations are immutable in nature, which makes mocking and spying a bit difficult

To create dataframe, I have dumped test dataset in csv


Solution

  • One of the first thing that I figured out in testing dataframe is to separate out Transformation and IOs

    So for the above scenarios we can divide the above chain into three parts

    class Coordinator {
        def transformAndWrite(dataframe: Dataframe): Unit = {
    transformedDf = dataFrame
            .withColumn("date", some_columnar_date_logic)
            .withColumn("hour", some_more_functional_logic)
            .... //couple more transformation logic
    
    partitionedDfWriter = transformedDf.write
            .mode(SaveMode.Append)
            .partitionBy("col1", "col2", "col3")
    

    and

    partitionedDfWriter.parquet("some hdfs/s3/url")
    }
    

    now we can move them to three separate classes,

    DFTransformer , DFPartitioner and DataFrameParquetWriter extends ResourceWriter

    So the code would become something like this

    class DFTransformer {
        def transform(dataframe:DataFrame): Dataframe = {
            return dataFrame
            .withColumn("date", some_columnar_date_logic)
            .withColumn("hour", some_more_functional_logic)
            .... //couple more transformation logic
    
    }
    
    class DfPartitioner {
        def partition(dataframe: DataFrame): DataFrameWriter = {
            return dataframe.write
            .mode(SaveMode.Append)
            .partitionBy("col1", "col2", "col3")
        }
    }
    
    

    and

    class DataFrameParquetWriter extends ResourceWriter {
        overide def write(partitionedDfWriter: DataFrameWriter) = {
           partitionedDfWriter.parquet("some hdfs/s3/url") 
    
        }
    
    class Coordinator(dfTransformer:DfTransformer, dfPartitioner: DFPartitioner, resourceWriter: ResourceWriter) {
        val transformedDf = dfTransformer.transform(dataframe)
        val partitionedDfWriter = dfPartitioner.partition(transformedDf)
        resourceWriter.write(partitionedDfWriter)
    }
    
    • The advantage of the above is that when you have to test your Coordinator class, you can very easily use Mockito to mock your dependencies.

    • Testing DFTransformer is also easy now, You can pass a stubbed Dataframe and assert the returned DataFrame.(using spark-testing-base). We can also test the columns returned by the Transformation. We can also test the count