Search code examples
javacassandraakkaakka-persistence

How to reuse CqlSession of Akka persistence plugin in Custom serializer


In Akka i'm creating a custom serializer for my app that sometimes go to cassandra database to fetch something.

To do this a repository is created and then is registered via Akka Extension to actor system. Then in custom Serializer constructor this repository is fetched from actor system.

The thing is this repository needs to connect to cassandra, which requires a CqlSession. for now I created a new CqlSession with alpakka cassandra to do the job but it is said that for each application Only one CqlSession should be used. Now i have 2 CqlSession. One used by akka persistence cassandra and one by this repository used in custom serializer. How can I reuse Akka persistence cassandra CqlSession in this repo so only 1 CqlSession be active in whole application.

Custom Serializer:

public class CustomSerializer extends AsyncSerializerWithStringManifestCS {

   private final CustomSerializerRepo customSerializerRepo;
   public CustomSerializer(ExtendedActorSystem system) {
        super(system);
        this.system = system;
        customSerializerRepo= customSerializerRepoExtension.get(Adapter.toTyped(system)).customSerializerRepo();       
    }

   @Override
    public CompletionStage<byte[]> toBinaryAsyncCS(Object unencryptedPayload) {
        // Uses this repo
    }

    @Override
    public CompletionStage<Object> fromBinaryAsyncCS(byte[] bytes, String manifest) {
       // ........
    }
}

Repository:

public class CustomSerializerRepo {
   
   private final CassandraSession cassandraSession;
   
   public CustomSerializerRepo (ActorSystem<?> actorSystem) {
       var cassandraSession = CassandraSessionRegistry.get(actorSystem).sessionFor(CassandraSessionSettings.create())
   }

   // go to cassandra and perform operations.
}

Solution

  • In custom serializer we have ActorSystem. Akka cassandra persistence plugin under the hood is using alpakka cassandra. We can get CqlSession created by Alpakka Cassandra via CassandraSessionRegistry extension.

    var cqlSession = CassandraSessionRegistry
                            .get(system)
                            .sessionFor(CassandraSessionSettings.create())
                            .underlying()
                            .toCompletableFuture()
                            .get()
    

    This is the same CqlSession used by akka persistence, hence reusing same session for whole application.