Search code examples
javacassandradatastaxapache-flinkdatastax-java-driver

PoolingOptions and Cassandra - java


I’m using datastax driver to use Cassandra as sink for some data streams with Apache Flink: I have a problem executing my application raising an error at runtime about the queue which become full after some seconds. I discovered that the default value is 256, that is probably too low for my load, so I have raised it using poolingOptions setting maxRequestsPerConnection as suggested here: http://docs.datastax.com/en/developer/java-driver/3.1/manual/pooling/.

Unfortunately with the following code I obtain the following error when I launch it:

The implementation of the ClusterBuilder is not serializable. 
The object probably contains or references non serializable fields.

My code:

PoolingOptions poolingOptions = new PoolingOptions();
    poolingOptions
      .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
      .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);


ClusterBuilder cassandraBuilder = new ClusterBuilder() {

    @Override
    public Cluster buildCluster(Cluster.Builder builder) {
        return builder.addContactPoint(CASSANDRA_ADDRESS)
                      .withPort(CASSANDRA_PORT)
                      .withPoolingOptions(poolingOptions)
                      .build();
    }
};

sinkBuilderNormalStream
    .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
        + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
        + " VALUES (?, ?, ?, ?, ?, ?);")
    .setClusterBuilder(cassandraBuilder)
    .build();

How can I deal with it?


Solution

  • You have to define the PoolingOptions within ClusterBuilder#buildCluster.