Search code examples
spring-bootcassandraspring-datacassandra-3.0datastax-java-driver

Cassandra Java API, Datastax


Is anyone using AsyncCassandraTemplate for batch operations with a list of custom objects?

I need to use the same but it seems passing an iterable is no longer supported.


Solution

  • You can achieve that easily using Datastax Driver. If you use Maven just add it to your pom file:

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.3.2</version>
    </dependency>
    
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-mapping</artifactId>
        <version>3.3.2</version>
    </dependency>
    
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-extras</artifactId>
        <version>3.3.2</version>
    </dependency>
    

    Then create an entity class:

    @Table(
        name = "message",
        keyspace = "test")
    
    public class Message {
        @PartitionKey
        @Column(name = "message_id")
        private String messageId;
    
        @ClusteringColumn
        private String date;
    
        private String title;
    
        public String getMessageId() {
            return messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
        public String getDate() {
            return date;
        }
    
        public void setDate(String date) {
            this.date = date;
        }
    
        public String getTitle() {
            return title;
        }
    
        public void setTitle(String title) {
            this.title = title;
        }
    }
    

    Then using the following code you can build a cluster then have some objects initiated then create statements for their save queries and then add them to a batch statement that is to be executed asynchronously

    public void executeBatchStatement() {
        Cluster cluster = makeCluster();
        Session session = cluster.connect();
    
        MappingManager mappingManager = new MappingManager(session);
        Mapper<Message> messageMapper = mappingManager.mapper(Message.class);
    
        Message messageObj1 = new Message();
        Message messageObj2 = new Message();
        Message messageObj3 = new Message();
        // populate these objects
    
        Statement messageStatement1 = messageMapper.saveQuery(messageObj1, Mapper.Option.saveNullFields(false)); // now this Statement represents the query to save this object
        Statement messageStatement2 = messageMapper.saveQuery(messageObj2, Mapper.Option.saveNullFields(false));
        Statement messageStatement3 = messageMapper.saveQuery(messageObj3, Mapper.Option.saveNullFields(false));
    
        BatchStatement messageBatchStatement = new BatchStatement();
        messageBatchStatement.add(messageStatement1);
        messageBatchStatement.add(messageStatement2);
        messageBatchStatement.add(messageStatement3);
    
        session.executeAsync(messageBatchStatement); // execute asynchronously
    }
    
    private Cluster makeCluster() {
        return Cluster.builder()
                .addContactPoint("localhost")
                .withPort(9042)
                .build();
    } 
    

    if you want to handle the result of the execution or do something upon success or failure you can alternatively do something like this

    ResultSetFuture future = session.executeAsync(messageBatchStatement);
    Futures.addCallback(future,
        new FutureCallback<ResultSet>() {
            @Override public void onSuccess(ResultSet result) {
                // handle success
            }
    
            @Override public void onFailure(Throwable t) {
                // handle error
            }
        }
    );