I'm playing with Akka Streams and have been trying to do some enrichment and processing of events polled from a MongoDB collection. I'm having some doubts, however, about the best approach to implement my event enrichers, which may need to connect to an external data source.
The mapFuture seems to be a good fit, but I'm having some issues:
class EventEnricherActor extends Actor with ActorLogging {
// ...
def receive = {
case e: Event =>
sender ! augmentEvent(e)
}
}
And my app:
val enricherActor = actorSystem.actorOf(Props[EventEnricherActor])
Flow(mongodbConsumer).
mapFuture(msg => enricherActor ? msg).
onComplete(materializer) { _ => actorSystem.shutdown()}
However, I'm stuck at this error:
java.lang.ClassCastException: messages.Event cannot be cast to scala.runtime.Nothing$
on the call to mapFuture.
What am I missing?
Any better ideas to deal with these enrichments?
Update Stack trace:
[ERROR] [08/12/2014 11:48:07.314] [actor-system-akka.actor.default-dispatcher-2] [akka://actor-system/user/flow-1-2-transform] failure during processing
java.lang.ClassCastException: messages.Event cannot be cast to scala.runtime.Nothing$
at apps.Consume$$anonfun$1$$anonfun$apply$1.apply(Consume.scala:47)
at akka.stream.impl.MapFutureProcessorImpl$$anonfun$1.apply$mcV$sp(MapFutureProcessorImpl.scala:125)
at akka.stream.impl.Pump$$anonfun$pump$1.apply$mcV$sp(Transfer.scala:163)
at akka.stream.impl.Pump$$anonfun$pump$1.apply(Transfer.scala:163)
at akka.stream.impl.Pump$$anonfun$pump$1.apply(Transfer.scala:163)
at akka.stream.impl.ActorBasedFlowMaterializer$.withCtx(ActorBasedFlowMaterializer.scala:133)
at akka.stream.impl.Pump$class.pump(Transfer.scala:163)
at akka.stream.impl.ActorProcessorImpl.pump(ActorProcessor.scala:238)
at akka.stream.impl.BatchingInputBuffer.enqueueInputElement(ActorProcessor.scala:93)
at akka.stream.impl.BatchingInputBuffer$$anonfun$upstreamRunning$1.applyOrElse(ActorProcessor.scala:140)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at akka.stream.impl.SubReceive.apply(Transfer.scala:18)
at akka.stream.impl.SubReceive.apply(Transfer.scala:14)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:14)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:165)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.stream.impl.ActorProcessorImpl.aroundReceive(ActorProcessor.scala:238)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Thanks
My mistake, mongodbConsumer (an ActorProducer) was untyped!
So this:
val mongodbConsumer = ActorProducer[Event](system.actorOf(MongodbConsumerActor.props(db)))
Instead of:
val mongodbConsumer = ActorProducer(system.actorOf(MongodbConsumerActor.props(db)))