I’m using datastax driver to use Cassandra as sink for some data streams with Apache Flink: I have a problem executing my application raising an error at runtime about the queue which become full after some seconds. I discovered that the default value is 256, that is probably too low for my load, so I have raised it using poolingOptions setting maxRequestsPerConnection as suggested here: http://docs.datastax.com/en/developer/java-driver/3.1/manual/pooling/.
Unfortunately with the following code I obtain the following error when I launch it:
The implementation of the ClusterBuilder is not serializable.
The object probably contains or references non serializable fields.
My code:
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);
ClusterBuilder cassandraBuilder = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint(CASSANDRA_ADDRESS)
.withPort(CASSANDRA_PORT)
.withPoolingOptions(poolingOptions)
.build();
}
};
sinkBuilderNormalStream
.setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
+ " (user, sensor, timestamp, rdf_stream, observed_value, value)"
+ " VALUES (?, ?, ?, ?, ?, ?);")
.setClusterBuilder(cassandraBuilder)
.build();
How can I deal with it?
You have to define the PoolingOptions within ClusterBuilder#buildCluster.