Search code examples
scalaapache-sparkapache-kafkaspark-streaminglong-running-processes

Using Kafka to communicate between long running Spark jobs


I am new to Apache Spark and have a need to run several long-running processes (jobs) on my Spark cluster at the same time. Often, these individual processes (each of which is its own job) will need to communicate with each other. Tentatively, I'm looking at using Kafka to be the broker in between these processes. So the high-level job-to-job communication would look like:

  1. Job #1 does some work and publishes message to a Kafka topic
  2. Job #2 is set up as a streaming receiver (using a StreamingContext) to that same Kafka topic, and as soon as the message is published to the topic, Job #2 consumes it
  3. Job #2 can now do some work, based on the message it consumed

From what I can tell, streaming contexts are blocking listeners that run on the Spark Driver node. This means that once I start the streaming consumer like so:

def createKafkaStream(ssc: StreamingContext,
        kafkaTopics: String, brokers: String): DStream[(String, 
        String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.collect().foreach { msg =>
            // Now do some work as soon as we receive a messsage from the topic
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

...that there are now 2 implications:

  1. The Driver is now blocking and listening for work to consume from Kafka; and
  2. When work (messages) are received, they are sent to any available Worker Nodes to actually be executed upon

So first, if anything that I've said above is incorrect or is misleading, please begin by correcting me! Assuming I'm more or less correct, then I'm simply wondering if there is a more scalable or performant way to accomplish this, given my criteria. Again, I have two long-runnning jobs (Job #1 and Job #2) that are running on my Spark nodes, and one of them needs to be able to 'send work to' the other one. Any ideas?


Solution

  • From what I can tell, streaming contexts are blocking listeners that run on the Spark Driver node.

    A StreamingContext (singular) isn't a blocking listener. It's job is to create the graph of execution for your streaming job.

    When you start reading from Kafka, you specify that you want to fetch new records every 10 seconds. What happens from now on depends on which Kafka abstraction you're using for Kafka, either the Receiver approach via KafkaUtils.createStream, or the Receiver-less approach via KafkaUtils.createDirectStream.

    In both approaches in general, data is being consumed from Kafka and then dispatched to each Spark worker to process in parallel.

    then I'm simply wondering if there is a more scalable or performant way to accomplish this

    This approach is highly scalable. When using the receiver-less approach, each Kafka partition maps to a Spark partition in a given RDD. You can increase parallelism by either increasing the amount of partitions in Kafka, or by re-partitions the data inside Spark (using DStream.repartition). I suggest testing this setup to determine if it suits your performance requirements.