Search code examples
postgresqlapache-sparkspark-streamingapache-spark-sqlscalikejdbc

Spark Scala DataFrame Single Row conversion to JSON for PostrgeSQL Insertion


With a DataFrame called lastTail, I can iterate like this:

import scalikejdbc._
// ... 
// Do Kafka Streaming to create DataFrame lastTail
// ...

lastTail.printSchema

lastTail.foreachPartition(iter => {

// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL) 

  while(iter.hasNext) {
    val item = iter.next()
    println("****")
    println(item.getClass)
    println(item.getAs("fileGid"))
    println("Schema: "+item.schema)
    println("String: "+item.toString())
    println("Seqnce: "+item.toSeq)

    // convert this item into an XXX format (like JSON)
    // write row to DB in the selected format
  }
})

This outputs "something like" (with redaction): root |-- fileGid: string (nullable = true) |-- eventStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true) |-- revisionStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true)

and (with just one iteration item - redacted, but hopefully with good enough syntax as well)

**** class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 12345 Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false)) String: [12345,[1,4,edit],[1,4,revision]] Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])

Note: I doing the part like val metric = iter.sum on https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala, but with DataFrames instead. I am also following "Design Patterns for using foreachRDD" seen at http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning.

How can I convert this org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema (see https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala) iteration item into a something that is easily written (JSON or ...? - I'm open) into PostgreSQL. (If not JSON, please suggest how to read this value back into a DataFrame for use at another point.)


Solution

  • Well I figured out a different way to do this as a work around.

    val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString)
    val ltv = lastTail.toJSON
    val kvPair = ltk.zip(ltv)
    

    Then I would simply iterate over the RDD instead of the DataFrame.

    kvPair.foreachPartition(iter => {
      while(iter.hasNext) {
        val item = iter.next()
        println(item.getClass)
        println(item)
      }
    })
    

    The data aside, I get class scala.Tuple2 which makes for a easier way to store KV pairs in JDBC / PostgreSQL.

    I'm sure that there could yet other ways that are not work-arounds.