Search code examples
mysqlscalaapache-spark

DataFrame turns empty after saving data into MySQL in spark


I want to saving data into MySQL, overwrite the duplicate lines in some field and keep the data in MySQL that the pending data does not contain . I've tried the Mode.Overwrite/Mode.append that still can not satisfy my needs. So I try to loading the existing data from MySQL and find the lines. But while saving data into MySQL, the obtained DataFrame turns empty.

During the process, I've tried two methods:

  1. Find the data that does not exist in the pending data, then use UNION to joint the two parts. At last, use Mode.Overwrite to save.
  2. Find the data that does not exist in the pending data. Use the Mode.Overwrite to save the pending DataFrame and Mode.append to save the obtained DF.

Both methods are not available. The obtained DF always was empty while saving in Method 1 or after saving with Mode.OverWrite in method 2.

Codes are below:

var mysql_table = spark.sqlContext.read.format("jdbc").options(jdbc_options).load()    
val list = pre_res.select("clientMacAddr").rdd.map(x => x.toString.substring(1,18)).collect()    
val rec_diff = mysql_table.filter(x => !(list.contains(x.apply(0).toString)))  
pre_res.write.mode("overwrite").format("jdbc").options(jdbc_options).save()
rec_diff.show()
rec_diff.write.mode("append").format("jdbc").options(jdbc_options).save()

The result is like this:

+------------------+----+

|clientMacAddr|var1|

+------------------+----+

+------------------+----+

Thanks.


Solution

  • Your result is empty because spark is lazy. It does not perform anything until you collect data to the driver (reduce, count, collect, show...) or write data to the disk (write, save...).

    Therefore your mysql table is only read and compared to pre_res when you call rec_diff.show(). By that time you have written pre_res onto the mysql table so pre_res contains the same data as your mysql table which causes the difference to be empty.

    Try showing (or collecting or writing) your difference before overwriting the mysql table (invert line 4 and 5 of your code) and you'll see a difference.

    Follow up:

    What was implied is that it is a bad idea to overwrite your input with spark. The simple reason is that spark is lazy (always keep that in mind) and won't read anything until you write something. At that point spark will drop the file to replace it by your data and start reading... a file you just deleted. The real reason behind all that is that spark is meant to deal with datasets much larger than any memory. Thus it is designed to read and process your data in small batches (executor tasks) and write the result progressively which is incompatible with overwriting the input.

    What you need to do is write your data in a temporary file (hdfs parquet for instance will be very efficient). Note that there is a similar thread here. What you are trying to do would be coded as follows:

    var mysql_table = spark.sqlContext.read.format("jdbc").options(jdbc_options).load()    
    val list = pre_res.select("clientMacAddr").rdd.map(x => x.toString.substring(1,18)).collect()    
    val rec_diff = mysql_table.filter(x => !(list.contains(x.apply(0).toString)))
    rec_diff.write.parquet("somewhere")
    val saved_rec_diff = spark.sqlContext.read.parquet("somewhere")
    saved_rec_diff.show()
    saved_rec_diff.write.mode("append").format("jdbc").options(jdbc_options).save()