Search code examples
javadatabasecassandraconnection

Do i have to open a new DB connection for every write in Cassandra?


I am trying to write a simple JAVA program which generates some data (just a POJO) which gets published in an Kafka topic. From this topic, a subscriber fetches the data and should write it into a Cassandra DB.

The producing and fetching is working fine but when it comes to writing the data into the Cassandra DB, there is something that makes me wondering.

When i am trying to write the data, i always have to open a new connection to the DB. The looks very unpleasant.

 @Override
  public void run() {

    setRunning(true);


    try {
      konsument.subscribe(Collections.singletonList(ServerKonfiguration.TOPIC));

      while (running) {

        ConsumerRecords<Long, SensorDaten> sensorDaten = konsument.poll(Long.MAX_VALUE);
        sensorDaten.forEach(
                datum -> {
                  CassandraConnector cassandraConnector = new CassandraConnector();

                  cassandraConnector.schreibeSensorDaten(datum.key(), datum.value());
                  System.out.printf(
                          "Consumer Record:(%d, %s, %d, %d)\n",
                          datum.key(), datum.value(), datum.partition(), datum.offset());
                });
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      konsument.close();
    }
  }

The code snippet above is working but as i mentioned, for every write i have to create a new connection.

When i initialize the cassandraConnector outside the loop, i make one successful write, and then i get "No hosts available" exceptions.

The CassandraConnector class:

 public class CassandraConnector {

  private final String KEYSPACE = "ba2";
  private final String SERVER_IP = "127.0.0.1";
  private Cluster cluster;
  private Session session;

  public CassandraConnector() {
    cluster = Cluster.builder().addContactPoint(SERVER_IP).build();
    session = cluster.connect(KEYSPACE);
  }

  public void schreibeSensorDaten(Long key, SensorDaten datum) {

    try {

      session.execute(
          "INSERT INTO.....

Solution

  • No, you need to re-use the cluster/session instances - they are quite heavyweight regarding initialization...

    It's also better to use prepared statements for data insertion - after you create a session, do something like:

    PreparedStatement pStmt = session.prepare("INSERT INTO ... VALUES (?, ?)");
    

    and then in the loop

    session.execute(pStmt.bind(datum.key(), datum.value()));
    

    Regarding the error, please check the logs on the Cassandra side.