Search code examples
javapostgresqlapache-spark

DB call in each row of dataset<row> in java


hi I'm trying to db call in each of dataset in java. But it keeps to occur this error.

my question are two things:

  1. Is this a unusual way of db call?
  2. Does it exist other solution?

This is the code I tried:

public class SparkSql implements Serializable{

     public void wordAddress(String word) {

      Dataset<Row> recent =sparkSession.read().format("jdbc")
                           .option("url","jdbc:postgresql://"+ip+":"+port+"/"+db )
                   .option("driver","org.postgresql.Driver")
                   .option("query", sql)
                   .option("user", user)
               .option("password", passwd)
                   .load();
     recent.foreach(x->{
            String temp = x.get(1).toString();
            Dataset<Row> old = this.oldAddress(temp); -->another DB call in the method like above 
                System.out.println(old.count())

This is the error log:

ERROR] 14:06:53.789 Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:87)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.createAggregate(AggUtils.scala:41)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.planAggregateWithoutDistinct(AggUtils.scala:92)
    at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:419)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)

Solution

  • You cannot create new Dataset's or use the SparkContext/SparkSession from executors, they only exist on the driver node.

    Your foreach is being passed to executors where you are trying to create another dataset, the function which is captured includes the reference to the SparkSession.

    To perform lookups across databases using Spark you need to create the two Dataset's first and then join them.