I am starting to use Spark Streaming to process a real time data feed I am getting. My scenario is I have a Akka actor receiver using "with ActorHelper", then I have my Spark job doing some mappings and transformation and then I want to send the result to another actor.
My issue is the last part. When trying to send to another actor Spark is raising an exception:
15/02/20 16:43:16 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'
The way I am creating this last actor is the following:
val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")
And then using it like this:
result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))
I am not sure where or how to do the advise "Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'". Do I need to set anything special through configuration? Or create my actor differently?
I found that if I collect before I send to the actor it works like a charm:
result.foreachRDD(rdd => rdd.collect().foreach(producer ! _))