I am a newbie to lagom and dgraph. And I got stuck to how to use lagom's read-side processor with Dgraph. Just to give you an idea following is the code which uses Cassandra with lagom.
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class FriendServiceImpl implements FriendService {
private final CassandraSession cassandraSession;
@Inject
public FriendServiceImpl(CassandraSession cassandraSession) {
this.cassandraSession = cassandraSession;
}
//Implement your service method here
}
Lagom does not provide out-of-the-box support for Dgraph. If you have to use Lagom's Read-Side processor with Dgraph, then you have to use Lagom's Generic Read Side support. Like this:
/**
* Read side processor for Dgraph.
*/
public class FriendEventProcessor extends ReadSideProcessor<FriendEvent> {
private static void createModel() {
//TODO: Initialize schema in Dgraph
}
@Override
public ReadSideProcessor.ReadSideHandler<FriendEvent> buildHandler() {
return new ReadSideHandler<FriendEvent>() {
private final Done doneInstance = Done.getInstance();
@Override
public CompletionStage<Done> globalPrepare() {
createModel();
return CompletableFuture.completedFuture(doneInstance);
}
@Override
public CompletionStage<Offset> prepare(final AggregateEventTag<FriendEvent> tag) {
return CompletableFuture.completedFuture(Offset.NONE);
}
@Override
public Flow<Pair<FriendEvent, Offset>, Done, ?> handle() {
return Flow.<Pair<FriendEvent, Offset>>create()
.mapAsync(1, eventAndOffset -> {
if (eventAndOffset.first() instanceof FriendCreated) {
//TODO: Add Friend in Dgraph;
}
return CompletableFuture.completedFuture(doneInstance);
}
);
}
};
}
@Override
public PSequence<AggregateEventTag<FriendEvent>> aggregateTags() {
return FriendEvent.TAG.allTags();
}
}
For FriendEvent.TAG.allTags()
, you have to add following code in FriendEvent
interface:
int NUM_SHARDS = 20;
AggregateEventShards<FriendEvent> TAG =
AggregateEventTag.sharded(FriendEvent.class, NUM_SHARDS);
@Override
default AggregateEventShards<FriendEvent> aggregateTag() {
return TAG;
}
I hope this helps!