I am very new to scala spark ecosystem and wondering what would be the best way to unit test a chained dataframe transformation.So here is code sample of the method that I would like to test
def writeToParquet(spark: SparkSession, dataFrame: DataFrame, col1: DataType1, col2:DataType2): Unit {
dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
.parquet("some hdfs/s3/url")
}
The problem is parquet is of Unit
return type which makes testing difficult.
The problem is further amplified by the fact that, transformations are immutable in nature, which makes mocking and spying a bit difficult
To create dataframe, I have dumped test dataset in csv
One of the first thing that I figured out in testing dataframe is to separate out Transformation and IOs
So for the above scenarios we can divide the above chain into three parts
class Coordinator {
def transformAndWrite(dataframe: Dataframe): Unit = {
transformedDf = dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
partitionedDfWriter = transformedDf.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
and
partitionedDfWriter.parquet("some hdfs/s3/url")
}
now we can move them to three separate classes,
DFTransformer
, DFPartitioner
and
DataFrameParquetWriter extends ResourceWriter
So the code would become something like this
class DFTransformer {
def transform(dataframe:DataFrame): Dataframe = {
return dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
}
class DfPartitioner {
def partition(dataframe: DataFrame): DataFrameWriter = {
return dataframe.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
}
}
and
class DataFrameParquetWriter extends ResourceWriter {
overide def write(partitionedDfWriter: DataFrameWriter) = {
partitionedDfWriter.parquet("some hdfs/s3/url")
}
class Coordinator(dfTransformer:DfTransformer, dfPartitioner: DFPartitioner, resourceWriter: ResourceWriter) {
val transformedDf = dfTransformer.transform(dataframe)
val partitionedDfWriter = dfPartitioner.partition(transformedDf)
resourceWriter.write(partitionedDfWriter)
}
The advantage of the above is that when you have to test your Coordinator class, you can very easily use Mockito
to mock your dependencies.
Testing DFTransformer
is also easy now,
You can pass a stubbed Dataframe and assert the returned DataFrame.(using spark-testing-base). We can also test the columns returned by the Transformation. We can also test the count