Search code examples
singlestore

MemSQL Spark connector inserting nulls from Spark to MemSQL


I have this program which is reading parquet files and writing it into a MemSQL table. I can confirm Spark reading files correctly as

df.printSchema()
df.show(5)

correctly prints the schema and the data.

When I query the table, I get all NULL values for the rows. Everything is NULL in the table. I am not sure what is going wrong here.

The code which writes parquet files to memsql

package com.rb.scala

    import com.memsql.spark.context.MemSQLContext
    import java.sql.{ DriverManager, ResultSet, Connection, Timestamp }

    import org.apache.spark._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.catalyst.expressions.RowOrdering

    import com.memsql.spark.connector._
    import com.memsql.spark.connector.OnDupKeyBehavior._
    import com.memsql.spark.connector.dataframe._
    import com.memsql.spark.connector.rdd._

    import scala.util.control.NonFatal
    import org.apache.log4j.Logger
    object MemSQLWriter {

    def main(arg: Array[String]) {

    var logger = Logger.getLogger(this.getClass())

    if (arg.length < 1) {
      logger.error("=> wrong parameters number")
      System.err.println("Usage: MainExample <directory containing the source files to be loaded to database > ")
      System.exit(1)
    }

    val jobName = "MemSQLWriter"
    val conf = new SparkConf().setAppName(jobName)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val pathToFiles = arg(0)
    logger.info("=> jobName \"" + jobName + "\"")
    logger.info("=> pathToFiles \"" + pathToFiles + "\"")
    val dbHost = "xx.xx.xx.xx"
    val dbPort = 3306
    val dbName = "memsqlrdd_db"
    val user = "root"
    val password = ""
    val tableName = "target_table"
    val dbAddress = "jdbc:mysql://" + dbHost + ":" + dbPort
    val df = sqlContext.read.parquet("/projects/example/data/")
    val conn = DriverManager.getConnection(dbAddress, user, password)
    val stmt = conn.createStatement
    stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName)
    stmt.execute("USE " + dbName)
    stmt.execute("DROP TABLE IF EXISTS " + tableName)
    df.printSchema()
    df.show(5)
    var columnArr  = df.columns
    var createQuery:String = " CREATE TABLE "+tableName+" ("
    logger.info("=> no of columns : "+columnArr.length)
    for(column <- columnArr){
       createQuery += column
       createQuery += " VARCHAR(100),"
    }
    createQuery += " SHARD KEY ("+columnArr(0)+"))"
    logger.info("=> create table query "+createQuery)
    stmt.execute(createQuery)

    df.select().saveToMemSQL(dbName, tableName, dbHost, dbPort, user, password, upsertBatchSize = 1000, useKeylessShardedOptimization = true)
    stmt.close()
  }
}

Solution

  • You are creating a table with a SHARD key, and then setting useKeylessShardingOptimization = true, which will give undefined behavior. Set this to false, and it should be good to go.

    Also, I'm not sure what df.select().saveToMemSQL... does. Try just df.saveToMemSQL ...

    When verifying, do something like SELECT * FROM table WHERE col IS NOT NULL LIMIT 10 to see if you actually have all nulls.

    PS: there is also df.createMemSQLTableAs, which does the thing you want.