Search code examples
apache-sparkspark-structured-streaming

How to write two streaming df's into two different tables in MySQL in Spark sturctured streaming?


I am using spark 2.3.2 Version.

I have written code in spark structured streaming to insert streaming dataframes data into two different MySQL tables.

Let say there are two streaming df's: DF1, DF2.

I have written two queries(query1,query2) using foreachWriter API to write into MySQL tables from different streamings respectively. I.E. DF1 into MYSQLtable A and DF2 into MYSQL table B.

When I run the spark job, first it runs query1 and then query2, so it's writing to table A but not into table B.

If I change my code to run query2 first and then query1, its writing into table B but not into table A.

So I understand that it's executing the first coming query only to write into the table.

Note: I have tried giving different MySQL user/database to two tables respectively. But no luck.

Can anyone please advise? How to make it work.

My code is below:

import java.sql._

class  JDBCSink1(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _
      
    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }

      def process(value: (org.apache.spark.sql.Row)): Unit = {

        val insertSql = """ INSERT INTO tableA(col1,col2,col3) VALUES(?,?,?); """
        val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
        preparedStmt.setString (1, value(0).toString)
        preparedStmt.setString (2, value(1).toString)
        preparedStmt.setString (3, value(2).toString)
        preparedStmt.execute
      }

      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
   }



class  JDBCSink2(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _
      
    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }

      def process(value: (org.apache.spark.sql.Row)): Unit = {

        val insertSql = """ INSERT INTO tableB(col1,col2) VALUES(?,?); """
        val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
        preparedStmt.setString (1, value(0).toString)
        preparedStmt.setString (2, value(1).toString)
        preparedStmt.execute
      }

      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
   }



val url1="jdbc:mysql://hostname:3306/db1"
val url2="jdbc:mysql://hostname:3306/db2"

val user1 ="usr1"
val user2="usr2"
val pwd = "password"

val Writer1 = new JDBCSink1(url1,user1, pwd)

val Writer2 = new JDBCSink2(url2,user2, pwd)


val query2 =
  streamDF2
    .writeStream
    .foreach(Writer2)
    .outputMode("append")
    .trigger(ProcessingTime("35 seconds"))
    .start().awaitTermination()



val query1 =
  streamDF1
    .writeStream
    .foreach(Writer1)
    .outputMode("append")
    .trigger(ProcessingTime("30 seconds"))
    .start().awaitTermination()

Solution

  • You are blocking the second query because of the awaitTermination. If you want to have two output streams you need to start both before waiting for their termination:

    val query2 =
      streamDF2
        .writeStream
        .foreach(Writer2)
        .outputMode("append")
        .trigger(ProcessingTime("35 seconds"))
        .start()
    
    val query1 =
      streamDF1
        .writeStream
        .foreach(Writer1)
        .outputMode("append")
        .trigger(ProcessingTime("30 seconds"))
        .start()
    
    query1.awaitTermination()
    query2.awaitTermination()
    

    Edit:

    Spark also allows you to schedule and allocate resources to the different streaming queries as described in Scheduling within an application. You can configure your pools based on

    • schedulingMode: can be FIFO or FAIR
    • weight: "This controls the pool’s share of the cluster relative to other pools. By default, all pools have a weight of 1. If you give a specific pool a weight of 2, for example, it will get 2x more resources as other active pools."
    • minShare: "Apart from an overall weight, each pool can be given a minimum shares (as a number of CPU cores) that the administrator would like it to have."

    The pool configurations can be set by creating an XML file, similar to conf/fairscheduler.xml.template, and either putting a file named fairscheduler.xml on the classpath, or setting spark.scheduler.allocation.file property in your SparkConf.

    conf.set("spark.scheduler.allocation.file", "/path/to/file")
    

    Applying different pool can be done like below:

    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
    
    // In the above example you could then tell Spark to make use of the pools
    val query1 = streamDF1.writeStream.[...].start(pool1)
    val query2 = streamDF2.writeStream.[...].start(pool2)