Search code examples
postgresqlscalaapache-sparkjdbcprepared-statement

Scala Upserts using jdbc preparedstatement and foreachPartition


I am new to scala/java programming

I need to create a scala jdbcTemplate for mapping multiple columns to a SQL query for a PostgreSQL database.

My insert query has about 80 columns.

something like this:

INSERT into schema.table_one (cov_eff_dt,cov_canc_dt,cov_pd_thru_dt,ebill_dt,retro_elig_recv_dt,retro_orig_cov_eff_dt,retro_orig_cov_canc_dt,cobra_eff_dt,elig_grc_prd_thru_dt,lst_prem_pd_dt,pol_ren_dt,partn_nbr,xref_id_partn_nbr,cnsm_id,prfl_id,src_cdb_xref_id,cos_pnl_nbr,src_tmstmp,row_tmstmp,created_dttm,updt_dttm,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd,cos_div_cd,mkt_typ_cd,cos_grp_nbr,lgcy_prdt_typ_cd,lgcy_prdt_cd,cov_lvl_typ_cd,shr_arng_cd,shr_arng_oblig_cd,lgcy_pln_var_cd,lgcy_rpt_cd,prdt_srvc_typ_cd,ee_sts_typ_cd,govt_pgm_typ_cd,clm_sys_typ_cd,elig_sys_typ_cd,ces_grp_nbr,mkt_site_cd,row_sts_cd,medica_trvlben_ind,row_user_id,sec_typ_cd,cancel_rsn_typ_cd,cov_pd_thru_rsn_cd,list_bill_typ_cd,billing_sufx_cd,billing_subgrp_nbr,retro_days,retro_typ_cd,retro_ovrd_typ_cd,tops_cov_lvl_typ_cd,lgcy_ben_pln_id,lgcy_prdt_id,rr_ben_grp_nbr,rr_ben_grp_cho_cd,rr_br_cd,rr_un_cd,rr_optout_plan_ind,updt_typ_cd,racf_id,prr_cov_mo,fund_typ_cd,state_of_issue_cd,cobra_mo,cobra_qual_evnt_cd,grndfathered_pol_ind,deriv_cov_ind,cnsm_lgl_enty_nm,indv_grp_typ_cd,src_cov_mnt_typ_cd,pbp_cd,h_cntrct_id,risk_typ_cd,bil_typ_cd,rate_cov_typ_cd,plan_cd,seg_id,src_sys_id) VALUES ( ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (cov_eff_dt,xref_id_partn_nbr,src_cdb_xref_id,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd) 
DO  UPDATE SET cov_canc_dt= ?,cov_pd_thru_dt= ?,ebill_dt= ?,retro_elig_recv_dt= ?,retro_orig_cov_eff_dt= ?,retro_orig_cov_canc_dt= ?,cobra_eff_dt= ?,elig_grc_prd_thru_dt= ?,lst_prem_pd_dt= ?,pol_ren_dt= ?,partn_nbr= ?,prfl_id= ?,cnsm_id= ?,cos_pnl_nbr= ?,src_tmstmp= ?,row_tmstmp= ?,updt_dttm= ?,cos_div_cd= ?,mkt_typ_cd= ?,cos_grp_nbr= ?,lgcy_prdt_typ_cd= ?,lgcy_prdt_cd= ?,cov_lvl_typ_cd= ?,shr_arng_cd= ?,shr_arng_oblig_cd= ?,lgcy_pln_var_cd= ?,lgcy_rpt_cd= ?,prdt_srvc_typ_cd= ?,ee_sts_typ_cd= ?,govt_pgm_typ_cd= ?,clm_sys_typ_cd= ?,elig_sys_typ_cd= ?,ces_grp_nbr= ?,mkt_site_cd= ?,row_sts_cd= ?,medica_trvlben_ind= ?,row_user_id= ?,sec_typ_cd= ?,cancel_rsn_typ_cd= ?,cov_pd_thru_rsn_cd= ?,list_bill_typ_cd= ?,billing_sufx_cd= ?,billing_subgrp_nbr= ?,retro_days= ?,retro_typ_cd= ?,retro_ovrd_typ_cd= ?,tops_cov_lvl_typ_cd= ?,lgcy_ben_pln_id= ?,lgcy_prdt_id= ?,rr_ben_grp_nbr= ?,rr_ben_grp_cho_cd= ?,rr_br_cd= ?,rr_un_cd= ?,rr_optout_plan_ind= ?,updt_typ_cd= ?,racf_id= ?,prr_cov_mo= ?,fund_typ_cd= ?,state_of_issue_cd= ?,cobra_mo= ?,cobra_qual_evnt_cd= ?,grndfathered_pol_ind= ?,deriv_cov_ind= ?,cnsm_lgl_enty_nm= ?,indv_grp_typ_cd= ?,src_cov_mnt_typ_cd= ?,pbp_cd= ?,h_cntrct_id= ?,risk_typ_cd= ?,bil_typ_cd= ?,rate_cov_typ_cd= ?,plan_cd= ?,seg_id= ?,src_sys_id= ?

The data that needs to be placed in "?" place-holder is stored in another dataframe , named inputdatafiledfwindow .

The mapping for columns, i.e. the functions for setting values in prepared statement is to be generated dynamically.

          val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")

inputdatafiledfwindow.coalesce(10).foreachPartition(partition => {
      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
      val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
      partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
                st.setShort(1, row.getShort(0))
                st.setInt(2, row.getInt(1))
                st.setString(3, row.getString(2).replaceAll("\\000", ""))
                st.setString(4, row.getString(3).replaceAll("\\000", ""))
                st.setString(5, row.getString(4).replaceAll("\\000", ""))
                st.setString(6, row.getString(5).replaceAll("\\000", ""))
                st.setDate(7, row.getDate(6))
                st.setDate(8, row.getDate(7))
                st.setString(9, row.getString(8).replaceAll("\\000", ""))
                st.setString(10, row.getString(9).replaceAll("\\000", ""))
                st.setString(11, row.getString(10).replaceAll("\\000", ""))
                st.setString(12, row.getString(11).replaceAll("\\000", ""))
                st.setString(13, row.getString(12).replaceAll("\\000", ""))
                st.setString(14, row.getString(13).replaceAll("\\000", ""))
                st.setString(15, row.getString(14).replaceAll("\\000", ""))
                st.setString(16, row.getString(15).replaceAll("\\000", ""))
                st.setString(17, row.getString(16).replaceAll("\\000", ""))
                st.setString(18, row.getString(17).replaceAll("\\000", ""))
                st.setString(19, row.getString(18).replaceAll("\\000", ""))
                st.setString(20, row.getString(19).replaceAll("\\000", ""))
                st.setString(21, row.getString(20).replaceAll("\\000", ""))
                st.setString(22, row.getString(21).replaceAll("\\000", ""))
                st.setString(23, row.getString(22).replaceAll("\\000", ""))
                st.setString(24, row.getString(23).replaceAll("\\000", ""))
                st.setString(25, row.getString(24).replaceAll("\\000", ""))
                st.setString(26, row.getString(25).replaceAll("\\000", ""))
          }

          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

Currently I'm trying something like this :

val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
    val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)

inputdatafiledfwindow.coalesce(10).foreachPartition(partition  => {
      val columnNames_br = sc.broadcast(inputdatafiledfwindow.columns)
      val columnDataTypes_br = sc.broadcast(inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString))
      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
      val st  = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
          for (i<-0 to columnNames.length-1) {
            if (columnDataTypes(i) == "ShortType")
              st.setShort((i+1).toInt, row.getShort(i))
            else if(columnDataTypes(i)== "IntegerType")
              st.setInt((i+1).toInt,row.getInt(i))
            else if (columnDataTypes(i)=="StringType")
              st.setString((i+1).toInt,row.getString(i))
            else if(columnDataTypes(i)=="TimestampType")
              st.setTimestamp((i+1).toInt, row.getTimestamp(i))
            else if(columnDataTypes(i)=="DateType")
              st.setDate((i+1).toInt,row.getDate(i))
            else if (columnDataTypes(i)=="DoubleType")
              st.setDouble((i+1).toInt, row.getDouble(i))
          }
          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

And this is giving me :org.apache.spark.SparkException: Task not serializable error

Any Ideas or resources I can refer to implement this. I know this is possible in java, but I have not worked too much in java, nor in Scala.

Edit : Tried using braodcast variable inside foreachPartition Still getting org.apache.spark.SparkException: Task not serializable

Below is the full exception stack :


org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2343)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:957)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:956)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:956)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
  ... 80 elided
Caused by: java.io.NotSerializableException: org.postgresql.jdbc.PgConnection
Serialization stack:
        - object not serializable (class: org.postgresql.jdbc.PgConnection, value: org.postgresql.jdbc.PgConnection@71c7a55b)
        - field (class: $iw, name: dbc, type: interface java.sql.Connection)
        - object (class $iw, $iw@22459ca5)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@788dd40c)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@31c725ed)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4a367987)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@7cffd7ab)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@3c615880)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@289fa6c2)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@2a5a0934)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4a04a12a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@c5fe90a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@58b67f02)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@243a4a22)
        - field (class: $line77.$read, name: $iw, type: class $iw)
        - object (class $line77.$read, $line77.$read@5f473976)
        - field (class: $iw, name: $line77$read, type: class $line77.$read)
        - object (class $iw, $iw@70fc6803)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@26818b0)
        - field (class: $anonfun$1, name: $outer, type: class $iw)
        - object (class $anonfun$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 98 more

Update 2 : Made modifications as suggested by @RamGhadiyaram , but now faced with new NullPointerException. I do not understand where I'm going wrong . Surely this must be a simple solution to this .

Below is updated code :

val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")

    val updatequeryforInsertAndUpdate = "INSERT INTO " + schema + table_name + updatequery + " where " + schema + table_name + s".row_tmstmp < '2020-02-17 00:00:00' OR ${table_name}.row_tmstmp < ?"

    val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
    val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)


    inputdatafiledfwindow.foreachPartition(partition  => {
      val columnNames_br = sc.broadcast(columnNames)
      val columnDataTypes_br = sc.broadcast(columnDataTypes)

      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString)
      val st  = dbc.prepareStatement(updatequeryforInsertAndUpdate)
      partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
          for (i<-0 to columnNames_br.value.length-1) {
            if (columnDataTypes_br.value(i) == "ShortType")
              st.setShort((i+1), row.getShort(i))
            else if(columnDataTypes_br.value(i)== "IntegerType")
              st.setInt((i+1),row.getInt(i))
            else if (columnDataTypes_br.value(i)=="StringType")
              st.setString((i+1),row.getString(i))
            else if(columnDataTypes_br.value(i)=="TimestampType")
              st.setTimestamp((i+1), row.getTimestamp(i))
            else if(columnDataTypes_br.value(i)=="DateType")
              st.setDate((i+1),row.getDate(i))
            else if (columnDataTypes_br.value(i)=="DoubleType")
              st.setDouble((i+1), row.getDouble(i))
          }
          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

and below is the new exception stack :

20/03/25 11:12:49 WARN TaskSetManager: Lost task 0.0 in stage 19.0 (TID 176, dbslp1102.uhc.com, executor 4): java.lang.NullPointerException
        at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:87)
        at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:86)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

20/03/25 11:12:49 ERROR TaskSetManager: Task 0 in stage 19.0 failed 4 times; aborting job
20/03/25 11:12:49 WARN TaskSetManager: Lost task 11.2 in stage 19.0 (TID 210, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 4.3 in stage 19.0 (TID 204, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 7.1 in stage 19.0 (TID 201, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 2.2 in stage 19.0 (TID 205, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 12.2 in stage 19.0 (TID 206, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 10.2 in stage 19.0 (TID 207, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 3.2 in stage 19.0 (TID 209, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 13.0 in stage 19.0 (TID 208, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 8.3 in stage 19.0 (TID 202, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 203, dbslp1102.uhc.com, executor 5): java.lang.NullPointerException
        at $anonfun$1.apply(<console>:87)
        at $anonfun$1.apply(<console>:86)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2341)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
  ... 86 elided
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:87)
  at $anonfun$1.apply(<console>:86)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Solution

  • As I understood you need to generate all the columns dynamically and insert in to the corresponding data column with correct data type.

    for that for the existing dataframe

        val columnNames : Array[String] = inputdatafiledfwindow.columns
    
        val columnDataTypes : Array[String] = inputdatafiledfwindow.schema.fields
                                .map(x=>x.dataType)
                                .map(x=>x.toString)
    

    Now you got the columns and corresponding datatypes.

    you implement in your loop by checking datatypes dynamically and call appropriate psmt.setxxx method for preparedstatement. and array index is parameter index for setXXX .

    In this case spring jdbc template is not required the same can be achieved using jdbc as well.

    UPDATE1 :


    Your column types i.e columnDataTypes and columns names array i.e. columnNames should be broadcasted using broadcast variable to use them in side the foreachpartition that might be root cause of the error .. org.apache.spark.SparkException: Task not serializable error


    UPDATE 2 : Caused by: java.io.NotSerializableException: org.postgresql.jdbc.PgConnection some where your connection has this problem

    AFAIK, user and password you are converting as string that might be from different object... but normally it looks okay with your code. re-check that.

    As per Spark docs you can declare url along with user and password like this as well. try it.

    JDBC To Other Databases - The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret

    UPDATE 3 : Reason for null pointer exception is straight forward. if you are operating on null then it will lead to null pointer exception

    For example: row.getString(8).replaceAll("\\000", "") if row.getString(8) is null and you are applying replaceAll then this is null pointer exception. you have to check whether row.getString(8) is not null and then you have to apply the function replaceAll..

    Best way to avoid null pointers are using scala Option.

    Another observation is use scala for each loop rather than java traditional loop.

    NOTE : PLEASE ASK SEPARATE QUESTION FOR EACH REQUIREMENT. DON'T MIX INCREMENTALLY. IT WILL CONFUSE OTHERS.