Search code examples
apache-sparkapache-spark-sqlakka-http

Spark Opening multiple threads for a single job while trying to run parallel jobs


We have a use case we were we need to run parallel spark sql queries on single spark session via rest-api (akka http).

Application Conf

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // or in Akka 2.4.2+
    fixed-pool-size = 4
  }
  throughput = 100
} 

Spark Service

import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
import org.apache.spark.sql.execution.ui.CustomSqlListener
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.parsing.json.JSON

trait SparkService {

  val session = SparkSession
                .builder()
                .config("spark.scheduler.mode", "FAIR")
                .appName("QueryCancellation")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate()

  var queryJobMapStart = Map[String, String]()
  var queryStatusMap = Map[String,String]()
  session.sparkContext.setLogLevel("ERROR")
  session.sparkContext.setCallSite("Reading the file")
  val dataDF = session.read
      .format("csv")
      .option("inferSchema","true")
      .option("header","true")
      .load("C:\\dev\\QueryCancellation\\src\\main\\resources\\Baby_Names__Beginning_2007.csv")



  dataDF.createOrReplaceTempView("data_tbl")


  //dataDF.printSchema()

  val customListener = new CustomSqlListener(session.sparkContext.getConf,queryJobMapStart,queryStatusMap)
  val appListener = session.sparkContext.addSparkListener(customListener)


  def runQuery(query : String, queryId: String)(implicit  ec : ExecutionContext)=  {


  //  println("queryId: "+ queryId +" query:" + query)
    session.sparkContext.setLocalProperty("callSite.short",queryId)
    session.sparkContext.setLocalProperty("callSite.long",query)

     session.sql(query).show(2)
    //Thread.sleep(60000)
   // Future(data)

  }

}

object SparkService extends SparkService 

Query Service

import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorSystem
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
trait QueryService extends SparkService {
  implicit val system: ActorSystem
  implicit val materializer : ActorMaterializer
  // implicit val sparkSession: SparkSession
  // val datasetMap = new ConcurrentHashMap[String, Dataset[Row]]()
  implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
  val route: Route =
    pathSingleSlash {
      get {
        complete {
          "welcome to rest service"
        }
      }
    } ~
      path("runQuery" / "county"/Segment) { county  =>
        get {
          complete{
            var res= ""
            val documentId = "user ::" + UUID.randomUUID().toString
            val queryId = System.nanoTime().toString
            val stmt = "select a.sex,count(*) from data_tbl a,data_tbl b where b.county=a.county and a.country= '"+county+"' group by a.sex"
            val result = runQuery(stmt,queryId)
            /* var entity = queryResult match {
               case Some(result) =>s"Query : $stmt  is submitted. Query id is $result. User id is $documentId"
               case None => s"Query : $stmt could not be submitted. User id is $documentId"
             }*/
            /*result.onComplete{
              case Success(value) => println(s"Query completed")
              case Failure(e) =>  None
            }*/
            var entity = s"Query : $stmt  is submitted. Query id is $queryId. User id is $documentId"
            entity
          }
        }
      } ~
      path("getStatus" / """[\w[0-9]-_]+""".r) { id =>
        get {
          complete {
            val statusResult = getStatus(id)
            var res = statusResult match {
              case Some(result) =>  s"Status for query id : $id is $result"
              case None =>  s"Could not find the status of the query id : $id"
            }
            res
          }
        }
      } ~
      path("killQuery" / """[\w[0-9]-_]+""".r) { id =>
        get {
          complete {
            val statusResult = killQuery(id)
            s"Query id $id is cancelled."
          }
        }
      }
}

Query Server

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer

import scala.concurrent.Future
//import scala.concurrent.ExecutionContext.Implicits.global

class QueryServer (implicit val system:ActorSystem ,
                   implicit val materializer: ActorMaterializer) extends QueryService {

  def startServer(address : String, port: Int) = {
      Http().bindAndHandle(route,address,port)
  }

}

    object QueryServer extends App {

      implicit val actorSystem = ActorSystem("query-server")
      implicit val materializer = ActorMaterializer()
      val server = new QueryServer()
      server.startServer("localhost",8080)
      println("running server at localhost 8080")

    }

When I try to run a query on spark sql via http:localhost:8080/runQuery/county/'KINGS', multiple job ids are created and out of which maximum skipped.

Below is the screen shot of Spark UI. I cannot understand why tasking highlighted is being created.

enter image description here

Below is the console log which shows the job was executed only once:

"running server at localhost 8080
173859599588358->2
****************************************************************************************
****************************************************************************************
Job id 2 is completed
--------------------------------------------------------
173859599588358->3
****************************************************************************************
****************************************************************************************
Job id 3 is completed
--------------------------------------------------------
173859599588358->4
****************************************************************************************
****************************************************************************************
Job id 4 is completed
--------------------------------------------------------
173859599588358->5
****************************************************************************************
****************************************************************************************
Job id 5 is completed
--------------------------------------------------------
173859599588358->6
****************************************************************************************
****************************************************************************************
Job id 6 is completed
--------------------------------------------------------
173859599588358->7
****************************************************************************************
****************************************************************************************
Job id 7 is completed
--------------------------------------------------------
+---+--------+
|sex|count(1)|
+---+--------+
|  F|12476769|
|  M|12095080|
+---+--------+

Spark Version :- 2.2.1


Solution

  • It looks like spark catalyst optimizer is optimizing the query. More than one DAG is created and probably choosing the best execution plan. You can see the execution plan, and there may be more than one. I think there is no relation to akka http here. Try running the code in spark shell and can verify the claim.