Search code examples
mysqlapache-sparkapache-spark-sqlsinglestore

Spark DataFrame InsertIntoJDBC - TableAlreadyExists Exception


Using Spark 1.4.0, I am trying to insert data from a Spark DataFrame into a MemSQL database (which should be exactly like interacting with a MySQL database) using insertIntoJdbc(). However I keep getting a Runtime TableAlreadyExists exception.

First I create the MemSQL table like this:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);

Then I create a simple dataframe in Spark and try to insert into MemSQL like this:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)

java.lang.RuntimeException: Table table1 already exists.

Solution

  • This solution applies to general JDBC connections, although the answer by @wayne is probably a better solution for memSQL specifically.

    insertIntoJdbc seems to have been deprecated as of 1.4.0, and using it actually calls write.jdbc().

    write() returns a DataFrameWriter object. If you want to append data to your table you will have to change the save mode of the object to "append".

    Another issue with the example in the question above is the DataFrame schema didn't match the schema of the target table.

    The code below gives a working example from the Spark shell. I am using spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar to start my spark-shell session.

    import java.util.Properties
    
    val prop = new Properties() 
    prop.put("user", "root")
    prop.put("password", "")  
    
    val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")   
    val dfWriter = df.write.mode("append") 
    
    dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop)