Search code examples
scalaapache-sparkdatastax-enterpriserdd

Scala mapPartition collect on partition do nothing


I'm trying to move data from rdd to postgres table, using:

def copyIn(reader: java.io.Reader, columnStmt: String = "") = {
        //connect to postgres database on the localhost
        val driver = "org.postgresql.Driver"
        var connection:Connection = null
        Class.forName(driver)
        connection = DriverManager.getConnection()

        try {
            connection.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY my_table ($columnStmt) FROM STDIN WITH CSV", reader)
        } catch {
            case se: SQLException => println(se.getMessage)
            case t: Throwable => println(t.getMessage)
        } finally {
            connection.close()
        }
    }

myRdd.mapPartitions(iter => {
        val sb = new StringBuilder()

        var n_iter = iter.map(row => {
            val mapRequest = Utils.getMyRowMap(myMap, row)
            sb.append(mapRequest.values.mkString(", ")).append("\n")
        })

        copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
        sb.clear
        n_iter
    }).collect

The script keep getting in to the CopyIn function with no data to insert in. I think it maybe because iter.map just map the partition and do not perform collect? I try to collect te whole myRdd object and still didnt get data in copyIn function.

How can I iterate over an rdd and get the StringBuilder appended and why the snippet above doesn't work? Anybody have a clue?


Solution

  • iter is an Iterator. So iter.map creates a new Iterator, but you don't actually iterate it and it does nothing. You probably want foreach instead. Except then iter will be empty by the time you return it and the result of collect will be an empty RDD.

    The actual method you want is foreachPartition:

    myRdd.foreachPartition(iter => {
            val sb = new StringBuilder()
    
            iter.foreach(row => {
                val mapRequest = Utils.getMyRowMap(myMap, row)
                sb.append(mapRequest.values.mkString(", ")).append("\n")
            })
    
            copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
            sb.clear
        })
    

    and then myRdd.collect if you want to collect it as well. (Persist myRdd if you want to use it twice without recalculating it.)