Search code examples
scalaapache-kafkaspark-streamingakka-stream

Akka Stream from within a Spark Job to write into kafka


Willing to be the most efficient in writing data back into kafka, i am interested in using Akka Stream to write my RDD partition back into Kafka.

The problem is that i need a way to create an actor system per executor and not per partition which would be ridiculous. One may end up with 8 actorSystems on one node on one JVM. However having a Stream per partition is fine.

Has anyone already done that ?

My understanding, an actor system can't be serialized, hence can't be sent has broadcast variable which would be per executor.

If one has had the experience around figuring a solution to that and tested please would you share ?

Else i can always fall back to https://index.scala-lang.org/benfradet/spark-kafka-writer/spark-kafka-0-10-writer/0.3.0?target=_2.11 but i am not sure it is the most efficient way.


Solution

  • You can always define a global lazy val with an actor system:

    object Execution {
      implicit lazy val actorSystem: ActorSystem = ActorSystem()
      implicit lazy val materializer: Materializer = ActorMaterializer()
    }
    

    Then you just import it in any of the classes where you want to use Akka Streams:

    import Execution._
    
    val stream: DStream[...] = ...
    
    stream.foreachRDD { rdd =>
      ...
      rdd.foreachPartition { records =>
        val (queue, done) = Source.queue(...)
          .via(Producer.flow(...))
          .toMat(Sink.ignore)(Keep.both)
          .run()  // implicitly pulls `Execution.materializer` from scope,
                  // which in turn will initialize `Execution.actorSystem`
    
        ... // push records to the queue
    
        // wait until the stream is completed
        Await.result(done, 10.minutes)
      }
    }
    

    The above is kind of pseudocode but I think it should convey the general idea.

    This way the system is going to be initialized on every executor JVM only once when it is needed. Additionally you can make the actor system "daemonic" in order for it to shut down automatically when the JVM finishes:

    object Execution {
      private lazy val config = ConfigFactory.parseString("akka.daemonic = on")
        .withFallback(ConfigFactory.load())
      implicit lazy val actorSystem: ActorSystem = ActorSystem("system", config)
      implicit lazy val materializer: Materializer = ActorMaterializer()
    }
    

    We're doing this in our Spark jobs and it works flawlessly.

    This works without any kind of broadcast variables, and, naturally, can be used in all kinds of Spark jobs, streaming or otherwise. Because the system is defined in a singleton object, it is guaranteed to be initialized only once per JVM instance (modulo various classloader shenanigans, but it doesn't really matter in the context of Spark), therefore even if some of the partitions get placed onto the same JVM (maybe in different threads), it will only initialize the actor system one time. lazy val ensures the thread-safety of the initialization, and ActorSystem is thread-safe, so this won't cause problems in this regard as well.