Search code examples
javaapache-sparkstreamingaccumulator

spark streaming dataframes and accumulators on java


I'am processing a kafka JSON-stream in Spark Structured Streaming. Processing as micro batches, can i use accumulators with streaming dataframes?

LongAccumulator longAccum = new LongAccumulator("my accum");

Dataset<Row> df2 = df.filter(output.col("Called number").equalTo("0860"))
            .groupBy("Calling number").count();
// put row counter to accumulator for example
df2.javaRDD().foreach(row -> {longAccumulator.add(1);})

throws

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

. Also i'am confused using accumulators this way. Converting dataframe down to RDD looks strange and unnecessarily. Can i do it withoutc RDD and foreach()?

According to exeption, i removed foreach from source dataframe and did it in writeStream().foreachBatch()

        StreamingQuery ds = df2
            .writeStream().foreachBatch( (rowDataset, aLong) -> {
                longAccum.add(1);
                log.info("accum : " + longAccum.value());
            })
            .outputMode("complete")
            .format("console").start();

It is working, but i have no values in logs, and can't see accumulator in GUI.


Solution

  • No, you can access using directly using dataset as below-

     LongAccumulator longAccum = spark.sparkContext().longAccumulator("my accum");
    
    
            Dataset<Row> df = spark.range(100).withColumn("x", lit("x"));
    
            //access in map
            df.map((MapFunction<Row, Row>) row -> {
                longAccum.add(1);
                return  row;
            }, RowEncoder.apply(df.schema()))
                    .count();
    
            // accumulator value
            System.out.println(longAccum.value()); // 100
    
            longAccum.reset();
            // access in for each
            df.foreach((ForeachFunction<Row>) row -> longAccum.add(1));
    
            // accumulator value
            System.out.println(longAccum.value()); // 100
    

    Please note that accumulator value gets updated only when the action performed.

    Using Streaming dataframe

     longAccum.reset();
            /**
             * streaming dataframe from csv dir
             * test.csv
             * --------
             * csv
             * id,name
             * 1,bob
             * 2,smith
             * 3,jam
             * 4,dwayne
             * 5,mike
             */
            String fileDir = getClass().getResource("/" + "csv").getPath();
            StructType schema = new StructType()
                    .add(new StructField("id", DataTypes.LongType, true, Metadata.empty()))
                    .add(new StructField("name", DataTypes.StringType, true, Metadata.empty()));
            Dataset<Row> json = spark.readStream().schema(schema).option("header", true).csv(fileDir);
    
            StreamingQuery streamingQuery = json
                    .map((MapFunction<Row, Row>) row -> {
                        longAccum.add(1);
                        return row;
                    }, RowEncoder.apply(df.schema()))
                    .writeStream()
                    .format("console").start();
            streamingQuery.processAllAvailable();
    
            // accumulator value
            System.out.println(longAccum.value()); // 5