I am new to scala/java programming
I need to create a scala jdbcTemplate for mapping multiple columns to a SQL query for a PostgreSQL database.
My insert query has about 80 columns.
something like this:
INSERT into schema.table_one (cov_eff_dt,cov_canc_dt,cov_pd_thru_dt,ebill_dt,retro_elig_recv_dt,retro_orig_cov_eff_dt,retro_orig_cov_canc_dt,cobra_eff_dt,elig_grc_prd_thru_dt,lst_prem_pd_dt,pol_ren_dt,partn_nbr,xref_id_partn_nbr,cnsm_id,prfl_id,src_cdb_xref_id,cos_pnl_nbr,src_tmstmp,row_tmstmp,created_dttm,updt_dttm,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd,cos_div_cd,mkt_typ_cd,cos_grp_nbr,lgcy_prdt_typ_cd,lgcy_prdt_cd,cov_lvl_typ_cd,shr_arng_cd,shr_arng_oblig_cd,lgcy_pln_var_cd,lgcy_rpt_cd,prdt_srvc_typ_cd,ee_sts_typ_cd,govt_pgm_typ_cd,clm_sys_typ_cd,elig_sys_typ_cd,ces_grp_nbr,mkt_site_cd,row_sts_cd,medica_trvlben_ind,row_user_id,sec_typ_cd,cancel_rsn_typ_cd,cov_pd_thru_rsn_cd,list_bill_typ_cd,billing_sufx_cd,billing_subgrp_nbr,retro_days,retro_typ_cd,retro_ovrd_typ_cd,tops_cov_lvl_typ_cd,lgcy_ben_pln_id,lgcy_prdt_id,rr_ben_grp_nbr,rr_ben_grp_cho_cd,rr_br_cd,rr_un_cd,rr_optout_plan_ind,updt_typ_cd,racf_id,prr_cov_mo,fund_typ_cd,state_of_issue_cd,cobra_mo,cobra_qual_evnt_cd,grndfathered_pol_ind,deriv_cov_ind,cnsm_lgl_enty_nm,indv_grp_typ_cd,src_cov_mnt_typ_cd,pbp_cd,h_cntrct_id,risk_typ_cd,bil_typ_cd,rate_cov_typ_cd,plan_cd,seg_id,src_sys_id) VALUES ( ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (cov_eff_dt,xref_id_partn_nbr,src_cdb_xref_id,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd)
DO UPDATE SET cov_canc_dt= ?,cov_pd_thru_dt= ?,ebill_dt= ?,retro_elig_recv_dt= ?,retro_orig_cov_eff_dt= ?,retro_orig_cov_canc_dt= ?,cobra_eff_dt= ?,elig_grc_prd_thru_dt= ?,lst_prem_pd_dt= ?,pol_ren_dt= ?,partn_nbr= ?,prfl_id= ?,cnsm_id= ?,cos_pnl_nbr= ?,src_tmstmp= ?,row_tmstmp= ?,updt_dttm= ?,cos_div_cd= ?,mkt_typ_cd= ?,cos_grp_nbr= ?,lgcy_prdt_typ_cd= ?,lgcy_prdt_cd= ?,cov_lvl_typ_cd= ?,shr_arng_cd= ?,shr_arng_oblig_cd= ?,lgcy_pln_var_cd= ?,lgcy_rpt_cd= ?,prdt_srvc_typ_cd= ?,ee_sts_typ_cd= ?,govt_pgm_typ_cd= ?,clm_sys_typ_cd= ?,elig_sys_typ_cd= ?,ces_grp_nbr= ?,mkt_site_cd= ?,row_sts_cd= ?,medica_trvlben_ind= ?,row_user_id= ?,sec_typ_cd= ?,cancel_rsn_typ_cd= ?,cov_pd_thru_rsn_cd= ?,list_bill_typ_cd= ?,billing_sufx_cd= ?,billing_subgrp_nbr= ?,retro_days= ?,retro_typ_cd= ?,retro_ovrd_typ_cd= ?,tops_cov_lvl_typ_cd= ?,lgcy_ben_pln_id= ?,lgcy_prdt_id= ?,rr_ben_grp_nbr= ?,rr_ben_grp_cho_cd= ?,rr_br_cd= ?,rr_un_cd= ?,rr_optout_plan_ind= ?,updt_typ_cd= ?,racf_id= ?,prr_cov_mo= ?,fund_typ_cd= ?,state_of_issue_cd= ?,cobra_mo= ?,cobra_qual_evnt_cd= ?,grndfathered_pol_ind= ?,deriv_cov_ind= ?,cnsm_lgl_enty_nm= ?,indv_grp_typ_cd= ?,src_cov_mnt_typ_cd= ?,pbp_cd= ?,h_cntrct_id= ?,risk_typ_cd= ?,bil_typ_cd= ?,rate_cov_typ_cd= ?,plan_cd= ?,seg_id= ?,src_sys_id= ?
The data that needs to be placed in "?" place-holder is stored in another dataframe , named inputdatafiledfwindow .
The mapping for columns, i.e. the functions for setting values in prepared statement is to be generated dynamically.
val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")
inputdatafiledfwindow.coalesce(10).foreachPartition(partition => {
val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
batch.foreach { row => {
st.setShort(1, row.getShort(0))
st.setInt(2, row.getInt(1))
st.setString(3, row.getString(2).replaceAll("\\000", ""))
st.setString(4, row.getString(3).replaceAll("\\000", ""))
st.setString(5, row.getString(4).replaceAll("\\000", ""))
st.setString(6, row.getString(5).replaceAll("\\000", ""))
st.setDate(7, row.getDate(6))
st.setDate(8, row.getDate(7))
st.setString(9, row.getString(8).replaceAll("\\000", ""))
st.setString(10, row.getString(9).replaceAll("\\000", ""))
st.setString(11, row.getString(10).replaceAll("\\000", ""))
st.setString(12, row.getString(11).replaceAll("\\000", ""))
st.setString(13, row.getString(12).replaceAll("\\000", ""))
st.setString(14, row.getString(13).replaceAll("\\000", ""))
st.setString(15, row.getString(14).replaceAll("\\000", ""))
st.setString(16, row.getString(15).replaceAll("\\000", ""))
st.setString(17, row.getString(16).replaceAll("\\000", ""))
st.setString(18, row.getString(17).replaceAll("\\000", ""))
st.setString(19, row.getString(18).replaceAll("\\000", ""))
st.setString(20, row.getString(19).replaceAll("\\000", ""))
st.setString(21, row.getString(20).replaceAll("\\000", ""))
st.setString(22, row.getString(21).replaceAll("\\000", ""))
st.setString(23, row.getString(22).replaceAll("\\000", ""))
st.setString(24, row.getString(23).replaceAll("\\000", ""))
st.setString(25, row.getString(24).replaceAll("\\000", ""))
st.setString(26, row.getString(25).replaceAll("\\000", ""))
}
st.addBatch()
}
}
st.executeBatch()
})
dbc.close()
})
Currently I'm trying something like this :
val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)
inputdatafiledfwindow.coalesce(10).foreachPartition(partition => {
val columnNames_br = sc.broadcast(inputdatafiledfwindow.columns)
val columnDataTypes_br = sc.broadcast(inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString))
val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
batch.foreach { row => {
for (i<-0 to columnNames.length-1) {
if (columnDataTypes(i) == "ShortType")
st.setShort((i+1).toInt, row.getShort(i))
else if(columnDataTypes(i)== "IntegerType")
st.setInt((i+1).toInt,row.getInt(i))
else if (columnDataTypes(i)=="StringType")
st.setString((i+1).toInt,row.getString(i))
else if(columnDataTypes(i)=="TimestampType")
st.setTimestamp((i+1).toInt, row.getTimestamp(i))
else if(columnDataTypes(i)=="DateType")
st.setDate((i+1).toInt,row.getDate(i))
else if (columnDataTypes(i)=="DoubleType")
st.setDouble((i+1).toInt, row.getDouble(i))
}
st.addBatch()
}
}
st.executeBatch()
})
dbc.close()
})
And this is giving me :org.apache.spark.SparkException: Task not serializable error
Any Ideas or resources I can refer to implement this. I know this is possible in java, but I have not worked too much in java, nor in Scala.
Edit : Tried using braodcast
variable inside foreachPartition
Still getting org.apache.spark.SparkException: Task not serializable
Below is the full exception stack :
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2343)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:957)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:956)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:956)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
... 80 elided
Caused by: java.io.NotSerializableException: org.postgresql.jdbc.PgConnection
Serialization stack:
- object not serializable (class: org.postgresql.jdbc.PgConnection, value: org.postgresql.jdbc.PgConnection@71c7a55b)
- field (class: $iw, name: dbc, type: interface java.sql.Connection)
- object (class $iw, $iw@22459ca5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@788dd40c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@31c725ed)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4a367987)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7cffd7ab)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3c615880)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@289fa6c2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2a5a0934)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4a04a12a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@c5fe90a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@58b67f02)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@243a4a22)
- field (class: $line77.$read, name: $iw, type: class $iw)
- object (class $line77.$read, $line77.$read@5f473976)
- field (class: $iw, name: $line77$read, type: class $line77.$read)
- object (class $iw, $iw@70fc6803)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@26818b0)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 98 more
Update 2 : Made modifications as suggested by @RamGhadiyaram , but now faced with new NullPointerException. I do not understand where I'm going wrong . Surely this must be a simple solution to this .
Below is updated code :
val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")
val updatequeryforInsertAndUpdate = "INSERT INTO " + schema + table_name + updatequery + " where " + schema + table_name + s".row_tmstmp < '2020-02-17 00:00:00' OR ${table_name}.row_tmstmp < ?"
val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)
inputdatafiledfwindow.foreachPartition(partition => {
val columnNames_br = sc.broadcast(columnNames)
val columnDataTypes_br = sc.broadcast(columnDataTypes)
val dbc: Connection = DriverManager.getConnection(jdbcConnectionString)
val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
batch.foreach { row => {
for (i<-0 to columnNames_br.value.length-1) {
if (columnDataTypes_br.value(i) == "ShortType")
st.setShort((i+1), row.getShort(i))
else if(columnDataTypes_br.value(i)== "IntegerType")
st.setInt((i+1),row.getInt(i))
else if (columnDataTypes_br.value(i)=="StringType")
st.setString((i+1),row.getString(i))
else if(columnDataTypes_br.value(i)=="TimestampType")
st.setTimestamp((i+1), row.getTimestamp(i))
else if(columnDataTypes_br.value(i)=="DateType")
st.setDate((i+1),row.getDate(i))
else if (columnDataTypes_br.value(i)=="DoubleType")
st.setDouble((i+1), row.getDouble(i))
}
st.addBatch()
}
}
st.executeBatch()
})
dbc.close()
})
and below is the new exception stack :
20/03/25 11:12:49 WARN TaskSetManager: Lost task 0.0 in stage 19.0 (TID 176, dbslp1102.uhc.com, executor 4): java.lang.NullPointerException
at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:87)
at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:86)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/03/25 11:12:49 ERROR TaskSetManager: Task 0 in stage 19.0 failed 4 times; aborting job
20/03/25 11:12:49 WARN TaskSetManager: Lost task 11.2 in stage 19.0 (TID 210, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 4.3 in stage 19.0 (TID 204, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 7.1 in stage 19.0 (TID 201, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 2.2 in stage 19.0 (TID 205, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 12.2 in stage 19.0 (TID 206, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 10.2 in stage 19.0 (TID 207, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 3.2 in stage 19.0 (TID 209, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 13.0 in stage 19.0 (TID 208, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 8.3 in stage 19.0 (TID 202, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 203, dbslp1102.uhc.com, executor 5): java.lang.NullPointerException
at $anonfun$1.apply(<console>:87)
at $anonfun$1.apply(<console>:86)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2341)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
... 86 elided
Caused by: java.lang.NullPointerException
at $anonfun$1.apply(<console>:87)
at $anonfun$1.apply(<console>:86)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
As I understood you need to generate all the columns dynamically and insert in to the corresponding data column with correct data type.
for that for the existing dataframe
val columnNames : Array[String] = inputdatafiledfwindow.columns
val columnDataTypes : Array[String] = inputdatafiledfwindow.schema.fields
.map(x=>x.dataType)
.map(x=>x.toString)
Now you got the columns and corresponding datatypes.
you implement in your loop by checking datatypes dynamically and call appropriate psmt.setxxx
method for preparedstatement. and array index is parameter index for setXXX .
In this case spring jdbc template is not required the same can be achieved using jdbc as well.
UPDATE1 :
Your column types i.e
columnDataTypes
and columns names array i.e.columnNames
should be broadcasted using broadcast variable to use them in side the foreachpartition that might be root cause of the error ..org.apache.spark.SparkException: Task not serializable error
UPDATE 2 : Caused by: java.io.NotSerializableException: org.postgresql.jdbc.PgConnection some where your connection has this problem
AFAIK, user and password you are converting as string that might be from different object... but normally it looks okay with your code. re-check that.
As per Spark docs you can declare url along with user and password like this as well. try it.
JDBC To Other Databases - The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g.,
jdbc:postgresql://localhost/test?user=fred&password=secret
UPDATE 3 : Reason for null pointer exception is straight forward. if you are operating on null then it will lead to null pointer exception
For example:
row.getString(8).replaceAll("\\000", "")
if row.getString(8)
is null and you are applying replaceAll
then
this is null pointer exception. you have to check whether row.getString(8)
is not null and then you have to apply the function replaceAll
..
Best way to avoid null pointers are using scala Option.
Another observation is use scala for each loop rather than java traditional loop.
NOTE : PLEASE ASK SEPARATE QUESTION FOR EACH REQUIREMENT. DON'T MIX INCREMENTALLY. IT WILL CONFUSE OTHERS.