I have this program which is reading parquet files and writing it into a MemSQL table. I can confirm Spark reading files correctly as
df.printSchema()
df.show(5)
correctly prints the schema and the data.
When I query the table, I get all NULL values for the rows. Everything is NULL in the table. I am not sure what is going wrong here.
The code which writes parquet files to memsql
package com.rb.scala
import com.memsql.spark.context.MemSQLContext
import java.sql.{ DriverManager, ResultSet, Connection, Timestamp }
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import com.memsql.spark.connector._
import com.memsql.spark.connector.OnDupKeyBehavior._
import com.memsql.spark.connector.dataframe._
import com.memsql.spark.connector.rdd._
import scala.util.control.NonFatal
import org.apache.log4j.Logger
object MemSQLWriter {
def main(arg: Array[String]) {
var logger = Logger.getLogger(this.getClass())
if (arg.length < 1) {
logger.error("=> wrong parameters number")
System.err.println("Usage: MainExample <directory containing the source files to be loaded to database > ")
System.exit(1)
}
val jobName = "MemSQLWriter"
val conf = new SparkConf().setAppName(jobName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val pathToFiles = arg(0)
logger.info("=> jobName \"" + jobName + "\"")
logger.info("=> pathToFiles \"" + pathToFiles + "\"")
val dbHost = "xx.xx.xx.xx"
val dbPort = 3306
val dbName = "memsqlrdd_db"
val user = "root"
val password = ""
val tableName = "target_table"
val dbAddress = "jdbc:mysql://" + dbHost + ":" + dbPort
val df = sqlContext.read.parquet("/projects/example/data/")
val conn = DriverManager.getConnection(dbAddress, user, password)
val stmt = conn.createStatement
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName)
stmt.execute("USE " + dbName)
stmt.execute("DROP TABLE IF EXISTS " + tableName)
df.printSchema()
df.show(5)
var columnArr = df.columns
var createQuery:String = " CREATE TABLE "+tableName+" ("
logger.info("=> no of columns : "+columnArr.length)
for(column <- columnArr){
createQuery += column
createQuery += " VARCHAR(100),"
}
createQuery += " SHARD KEY ("+columnArr(0)+"))"
logger.info("=> create table query "+createQuery)
stmt.execute(createQuery)
df.select().saveToMemSQL(dbName, tableName, dbHost, dbPort, user, password, upsertBatchSize = 1000, useKeylessShardedOptimization = true)
stmt.close()
}
}
You are creating a table with a SHARD key, and then setting useKeylessShardingOptimization = true
, which will give undefined behavior. Set this to false, and it should be good to go.
Also, I'm not sure what df.select().saveToMemSQL...
does. Try just df.saveToMemSQL ...
When verifying, do something like SELECT * FROM table WHERE col IS NOT NULL LIMIT 10
to see if you actually have all nulls.
PS: there is also df.createMemSQLTableAs
, which does the thing you want.