I am trying to write a simple JAVA program which generates some data (just a POJO) which gets published in an Kafka topic. From this topic, a subscriber fetches the data and should write it into a Cassandra DB.
The producing and fetching is working fine but when it comes to writing the data into the Cassandra DB, there is something that makes me wondering.
When i am trying to write the data, i always have to open a new connection to the DB. The looks very unpleasant.
@Override
public void run() {
setRunning(true);
try {
konsument.subscribe(Collections.singletonList(ServerKonfiguration.TOPIC));
while (running) {
ConsumerRecords<Long, SensorDaten> sensorDaten = konsument.poll(Long.MAX_VALUE);
sensorDaten.forEach(
datum -> {
CassandraConnector cassandraConnector = new CassandraConnector();
cassandraConnector.schreibeSensorDaten(datum.key(), datum.value());
System.out.printf(
"Consumer Record:(%d, %s, %d, %d)\n",
datum.key(), datum.value(), datum.partition(), datum.offset());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
konsument.close();
}
}
The code snippet above is working but as i mentioned, for every write i have to create a new connection.
When i initialize the cassandraConnector
outside the loop, i make one successful write, and then i get "No hosts available" exceptions.
The CassandraConnector class:
public class CassandraConnector {
private final String KEYSPACE = "ba2";
private final String SERVER_IP = "127.0.0.1";
private Cluster cluster;
private Session session;
public CassandraConnector() {
cluster = Cluster.builder().addContactPoint(SERVER_IP).build();
session = cluster.connect(KEYSPACE);
}
public void schreibeSensorDaten(Long key, SensorDaten datum) {
try {
session.execute(
"INSERT INTO.....
No, you need to re-use the cluster/session instances - they are quite heavyweight regarding initialization...
It's also better to use prepared statements for data insertion - after you create a session, do something like:
PreparedStatement pStmt = session.prepare("INSERT INTO ... VALUES (?, ?)");
and then in the loop
session.execute(pStmt.bind(datum.key(), datum.value()));
Regarding the error, please check the logs on the Cassandra side.