I'm trying to move data from rdd to postgres table, using:
def copyIn(reader: java.io.Reader, columnStmt: String = "") = {
//connect to postgres database on the localhost
val driver = "org.postgresql.Driver"
var connection:Connection = null
Class.forName(driver)
connection = DriverManager.getConnection()
try {
connection.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY my_table ($columnStmt) FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => println(se.getMessage)
case t: Throwable => println(t.getMessage)
} finally {
connection.close()
}
}
myRdd.mapPartitions(iter => {
val sb = new StringBuilder()
var n_iter = iter.map(row => {
val mapRequest = Utils.getMyRowMap(myMap, row)
sb.append(mapRequest.values.mkString(", ")).append("\n")
})
copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
sb.clear
n_iter
}).collect
The script keep getting in to the CopyIn function with no data to insert in. I think it maybe because iter.map just map the partition and do not perform collect? I try to collect te whole myRdd object and still didnt get data in copyIn function.
How can I iterate over an rdd and get the StringBuilder appended and why the snippet above doesn't work? Anybody have a clue?
iter
is an Iterator
. So iter.map
creates a new Iterator
, but you don't actually iterate it and it does nothing. You probably want foreach
instead. Except then iter
will be empty by the time you return it and the result of collect
will be an empty RDD.
The actual method you want is foreachPartition
:
myRdd.foreachPartition(iter => {
val sb = new StringBuilder()
iter.foreach(row => {
val mapRequest = Utils.getMyRowMap(myMap, row)
sb.append(mapRequest.values.mkString(", ")).append("\n")
})
copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
sb.clear
})
and then myRdd.collect
if you want to collect it as well. (Persist myRdd
if you want to use it twice without recalculating it.)