Search code examples
javacassandraakkalagom

akka.pattern.AskTimeoutException is thrown while persisting an event


I just started with Lagom & Akka. I am following the design decribed in Domain Modelling with Akka Persistence Typed

I am trying to create a brand new instance of an entity (EntityState). But the event is not getting persisted, and I am getting the following error:

00:54:27.862 [error] com.example.impl.entity.EntityClass [persistencePhase=running-cmd, akkaAddress=akka://[email protected]:60685, akkaSource=akka://XXX/system/sharding/StateClass/186/ID1, sourceActorSystem=XXX, persistenceId=StateClass|ID1] - Supervisor StopSupervisor saw failure: null
java.lang.NullPointerException: null
    at akka.persistence.typed.javadsl.EventSourcedBehavior.$anonfun$apply$4(EventSourcedBehavior.scala:195)
    at akka.persistence.typed.internal.Running$RunningState.applyEvent(Running.scala:78)
    at akka.persistence.typed.internal.Running$HandlingCommands.applyEffects(Running.scala:153)
    at akka.persistence.typed.internal.Running$HandlingCommands.onCommand(Running.scala:123)
    at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:105)
    at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:100)
    at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83) 

I have a Create command, which invokes onCreate(), and eventually attempts to persist an EntityCreated event.

Service Impl method

@Override
public ServiceCall<CreateMessage, StateView> createState(){
    return message ->
            entityRef(message.getName())
                    .<EntityClass.Accepted>ask(replyTo -> new EntityClass.Create(message, replyTo), askTimeout)
                    .thenApply(accepted -> toStateView(accepted.getSummary()));
}

Command handler:

private ReplyEffect<Event, StateClass> onCreate(StateClass state, Create cmd) {
    return Effect()
            .persist(new EntityCreated(cmd.getDetails().getName(), Instant.now()))
            .thenReply(cmd.replyTo, e -> new Accepted(EntityClass.toSummary(e)));
}

I am able to confirm the following:

  • exception is thrown during persist()
  • the event is not present in Cassandra

Your help is appreciated. Thank you in advance!


Solution

  • It seems that the real cause for the exception was because I should have added logic for handling the event as follows: in helloEvents(), I needed to add logic similar to the following:

    if (eventAndOffset.first() instanceof HelloEvent.GreetingMessageChanged) {
        HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) eventAndOffset.first();
        eventToPublish = new GreetingMessageChanged(messageChanged.getName(), messageChanged.getMessage());
    }
    

    In Addition, in the aggregate's eventHandler(), I needed to add logic similar to the following:

    builder.forAnyState()
        .onEvent(GreetingMessageChanged.class, (state, evt) ->
            // We simply update the current state to use the greeting message from
            // the event.
            state.withMessage(evt.message)
    );