Search code examples
apache-sparkapache-spark-sqlspark-streamingspark-structured-streaming

SparkSession null point exception in Dataset foreach


I am new to Spark.

I want to keep getting message from kafka, and then save to S3 once the message size over 100000.

I implemented it by Dataset.collectAsList(), but it throw error with Total size of serialized results of 3 tasks (1389.3 MiB) is bigger than spark.driver.maxResultSize

So I turned to use foreach, and it said null point exception when used SparkSession to createDataFrame.

Any idea about it? Thanks.

---Code---

SparkSession spark = generateSparkSession();
        registerUdf4AddPartition(spark);
        Dataset<Row> dataset = spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", args[0])
                .option("kafka.group.id", args[1])
                .option("subscribe", args[2])
                .option("kafka.security.protocol", SecurityProtocol.SASL_PLAINTEXT.name)
                .load();
        DataStreamWriter<Row> console = dataset.toDF().writeStream().foreachBatch((rawDataset, time) -> {
            Dataset<Row> rowDataset = rawDataset.selectExpr("CAST(value AS STRING)");
            //using foreach
            rowDataset.foreach(row -> {
                List<Span> rawDataList = new CsvToBeanBuilder(new StringReader(row.getString(0))).withType(Span.class).build().parse();
                spans.addAll(rawDataList);
                batchSave(spark);
            });

            // using collectAsList
            List<Row> rows = rowDataset.collectAsList();
            for (Row row : rows) {
                List<Span> rawDataList = new CsvToBeanBuilder(new StringReader(row.getString(0))).withType(Span.class).build().parse();
                spans.addAll(rawDataList);
                batchSave(spark);
            }
        });
        StreamingQuery start = console.start();
        start.awaitTermination();

public static void batchSave(SparkSession spark){
        synchronized (spans){
            if(spans.size() == 100000){
                System.out.println(spans.isEmpty());
                Dataset<Row> spanDataSet = spark.createDataFrame(spans, Span.class);
                Dataset<Row> finalResult = addCustomizedTimeByUdf(spanDataSet);

                StringBuilder pathBuilder = new StringBuilder("s3a://fwk-dataplatform-np/datalake/log/FWK/ART2/test/leftAndRight");
                finalResult.repartition(1).write().partitionBy("year","month","day","hour").format("csv").mode("append").save(pathBuilder.toString());
                spans.clear();
            }
        }
    }

Solution

  • Since the main SparkSession is running in driver, and tasks in foreach... is running distributed in executors, so the spark is not defined to all other executors.

    BTW, there is no meaning to use synchronized inside foreach task since everything is distributed.