Search code examples
unit-testingapache-sparkbigdataspark-structured-streamingapache-spark-dataset

How to compare two spark datasets


I was trying to test the structured streaming joins in spark 3.2.2. So i have two files which i am reading as a stream and then joining them on some already defined condition as

SparkSession spark = SparkSession.builder().master("local")
    .appName("streamstreamJoinTest").getOrCreate();
spark.sparkContext().setLogLevel("WARN");
Dataset<Row> adClickDF = spark
    .readStream()
    .format("json")
    .schema(Schema.getSchema("adClick_flow"))
    .option("path","D:/Users/subodh.k/IdeaProjects/spark/src/test/sources/adClickFlowTest/")
    .load()
    .withColumnRenamed("time_stamp","adClick_flow_time");
assert(adClickDF.isStreaming());
Dataset<Row> prebidRADF = spark
     .readStream()
     .format("json")
     .schema(Schema.getSchema("prebidRenderedStream"))
     .option("path","D:/Users/subodh.k/IdeaProjects/spark/src/test/sources/prebidRATest/")
     .load()
     .withColumnRenamed("time_stamp","prebidRenderedStream_time");
assert(prebidRADF.isStreaming());
Dataset<Row>result = streamstreamJoin
    .performJoin(prebidRADF,adClickDF,"inner",config.getJoinCondition("prebidRenderedStream,adClick_flow"));
result = result
     .drop(functions.col("adClick_flow_time"))
     .drop(functions.col("prebidRenderedStream_time"));
result.printSchema();

I have expected output inside a json file but now i am facing problems like how exactly to assert that the actual output (i.e. result stream) is same as expected result.

So i read somewhere that there are some restrictions on streaming sources so i decided to overwrite writestream function using forEatchBatch. here it is-

Dataset<Row> expectedResult =spark
    .read()
    .schema(Schema.getSchema("join_test"))
    .json("./src/test/sources/result");
expectedResult.show();
expectedResult.printSchema();

result.writeStream()
    .foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
        @Override
        public void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
            rowDataset.persist();
            rowDataset.write().format("json").save("./src/test/sources/out");  // location 1
            rowDataset.show();
            boolean isEqual=(rowDataset.schema()==expectedResult.schema());
//          boolean isEqual=(rowDataset.union(expectedResult) == rowDataset.intersect(expectedResult));
            System.out.println(isEqual);
            assertTrue(isEqual);
            rowDataset.unpersist();
        }
    }).start().awaitTermination();
spark.stop();

In this case even the schema is not matching, but you can see the output that they are identical.

+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
|            prvReqId|              requrl|      cid|commit_id|customer_id|       publisher_url|            click_id|creative_id|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa|  8CU7FAK9R|https://www.sluur...|1688231161658223d...|  718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa|  8CU7FAK9R|https://www.sluur...|1688231161658223d...|  718736355|
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa|  8CU7FAK9R|https://www.sluur...|1688231161658223d...|  718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa|  8CU7FAK9R|https://www.sluur...|1688231161658223d...|  718736355|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+

root
 |-- prvReqId: string (nullable = true)
 |-- requrl: string (nullable = true)
 |-- cid: string (nullable = true)
 |-- commit_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- publisher_url: string (nullable = true)
 |-- click_id: string (nullable = true)
 |-- creative_id: string (nullable = true)

requrl=publisher_url AND prebidRenderedStream_time <=  adClick_flow_time AND prebidRenderedStream_time + interval 10 minute > adClick_flow_time 
root
 |-- prvReqId: string (nullable = true)
 |-- requrl: string (nullable = true)
 |-- cid: string (nullable = true)
 |-- commit_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- publisher_url: string (nullable = true)
 |-- click_id: string (nullable = true)
 |-- creative_id: string (nullable = true)

23/07/03 12:38:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: D:\Users\subodh.k\AppData\Local\Temp\1\temporary-81996e77-70da-4968-adcb-dea49ad662b4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/07/03 12:38:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/07/03 12:38:42 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
|            prvReqId|              requrl|      cid|commit_id|customer_id|       publisher_url|            click_id|creative_id|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa|  8CU7FAK9R|https://www.sluur...|1688231161658223d...|  718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa|  8CU7FAK9R|https://www.sluur...|1688231161658223d...|  718736355|
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa|  8CU7FAK9R|https://www.sluur...|1688231161658223d...|  718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa|  8CU7FAK9R|https://www.sluur...|1688231161658223d...|  718736355|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa|  8CU161VUM|https://www.daily...|168823129190822wb...|  867420183|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+

false

Then i tried another approach that is write the stream in a file and then compare this file with the expectedoutput file. But writing a stream output in a specefic file is not supported as i far as i know. It is treating the path (where the output has to be written) as a directory and then in that directory i don't know how to retrieve the file in which actually the data has been written using java as the filename depends on the batchid or offset ig.

I want to check if the result from joining is actually the same as expected result. So please suggest me a way to do it. I am an intern and new to actual corporate programming and hence do not know much about unit testing. So if my approach is wrong in unit testing then, you can also let me know a better approach to do it.


Solution

  • Try to do this:

    if (firstDataFrame.exceptAll(secondDataFrame).head(1).isEmpty) {
    //The two DF are equals, process here
    } else {
    //There is difference
    }