Search code examples
javamicroserviceslagomdgraph

How can we use lagom's Read-side processor with Dgraph?


I am a newbie to lagom and dgraph. And I got stuck to how to use lagom's read-side processor with Dgraph. Just to give you an idea following is the code which uses Cassandra with lagom.

import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class FriendServiceImpl implements FriendService {

private final CassandraSession cassandraSession;

@Inject
public FriendServiceImpl(CassandraSession cassandraSession) {
    this.cassandraSession = cassandraSession;
}

//Implement your service method here

}

Solution

  • Lagom does not provide out-of-the-box support for Dgraph. If you have to use Lagom's Read-Side processor with Dgraph, then you have to use Lagom's Generic Read Side support. Like this:

    /**
     * Read side processor for Dgraph.
     */
    public class FriendEventProcessor extends ReadSideProcessor<FriendEvent> {
        private static void createModel() {
            //TODO: Initialize schema in Dgraph
        }
    
        @Override
        public ReadSideProcessor.ReadSideHandler<FriendEvent> buildHandler() {
            return new ReadSideHandler<FriendEvent>() {
                private final Done doneInstance = Done.getInstance();
    
                @Override
                public CompletionStage<Done> globalPrepare() {
                    createModel();
                    return CompletableFuture.completedFuture(doneInstance);
                }
    
                @Override
                public CompletionStage<Offset> prepare(final AggregateEventTag<FriendEvent> tag) {
                    return CompletableFuture.completedFuture(Offset.NONE);
                }
    
                @Override
                public Flow<Pair<FriendEvent, Offset>, Done, ?> handle() {
                    return Flow.<Pair<FriendEvent, Offset>>create()
                            .mapAsync(1, eventAndOffset -> {
                                        if (eventAndOffset.first() instanceof FriendCreated) {
                                            //TODO: Add Friend in Dgraph;
                                        }
    
                                        return CompletableFuture.completedFuture(doneInstance);
                                    }
                            );
                }
            };
        }
    
        @Override
        public PSequence<AggregateEventTag<FriendEvent>> aggregateTags() {
            return FriendEvent.TAG.allTags();
        }
    }
    

    For FriendEvent.TAG.allTags(), you have to add following code in FriendEvent interface:

    int NUM_SHARDS = 20;
    
      AggregateEventShards<FriendEvent> TAG =
              AggregateEventTag.sharded(FriendEvent.class, NUM_SHARDS);
    
      @Override
      default AggregateEventShards<FriendEvent> aggregateTag() {
        return TAG;
      }
    

    I hope this helps!