Search code examples
javalagomakka-persistence

How to fetch all persisted entities


I am completely new to the lagom framework, hence, i was reading the documentation today and started modifying their hello world example.

However, i am unable to find a way to fetch all persisted entities (i.e. all persisted greetings in this example).

This is how the default example fetches a person's greeting:

@Override
public ServiceCall<GreetingMessage, Done> useGreeting(String id) {
  return request -> {
    // Look up the hello world entity for the given ID.
    PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloWorld.class, id);
    // Tell the entity to use the greeting message specified.
    return ref.ask(new UseGreetingMessage(request.message));
  };
}

Now, instead of looking up an entity using a given ID, i want to fetch all entities, e.g. something like persistentEntityRegistry.getIds(), then i could fetch them one by one by id. However, such a method seems not to exist for the entity registry?


Solution

  • It is possible to get all of the entity IDs by using the underlying Akka Persistence framework directly to do an allPersistenceIds or currentPersistenceIds query

    You can see an example of this in use in the Lagom Online Auction example application, in UserServiceImpl.java:

    public class UserServiceImpl implements UserService {
        //...
        private final CurrentPersistenceIdsQuery currentIdsQuery;
        private final Materializer mat;
    
        @Inject
        public UserServiceImpl(PersistentEntityRegistry registry, ActorSystem system, Materializer mat) {
            //...
            this.mat = mat;
            this.currentIdsQuery =
                    PersistenceQuery.get(system)
                        .getReadJournalFor(
                            CassandraReadJournal.class,
                            CassandraReadJournal.Identifier()
                        );
            //...
        }
    
        //...
        @Override
        public ServiceCall<NotUsed, PSequence<User>> getUsers() {
            // Note this should never make production....
            return req -> currentIdsQuery.currentPersistenceIds()
                    .filter(id -> id.startsWith("UserEntity"))
                    .mapAsync(4, id ->
                        entityRef(id.substring(10))
                            .ask(UserCommand.GetUser.INSTANCE))
                    .filter(Optional::isPresent)
                    .map(Optional::get)
                    .runWith(Sink.seq(), mat)
                    .thenApply(TreePVector::from);
        }
        //...
    }
    

    This approach, while possible, is rarely a good idea. You may have noticed the comment in the example code: "this should never make production". There is no way to perform aggregate commands using this approach: you are limited to sending commands to each entity one by one. This can cause spikes in memory consumption and traffic between nodes in your service cluster. It also isn't possible to filter this list of IDs by any criteria of the entity state, as you might be used to from row-oriented SQL data models.

    It is almost always more appropriate to define a read-side model for your data. This takes the form of a a separate "read-side" data store that is built to purpose for the type of queries your application needs, and an event processor that is automatically invoked as your entities emit events, which updates the read-side data store to reflect those changes.

    The Lagom framework helps to ensure eventual consistency in your application by managing your read-side event processors, tracking their position in the event log, and automatically restarting them on restart or failure. This type of resilience is otherwise tricky to implement for aggregate operations.

    (This answer is adapted from a related discussion in the Lagom Framework Google Group.)