Search code examples
spring-bootcassandraspring-batchspring-kafkabatch-processing

Save data to Cassandra from Kafka using Spring Batch with high throughput


I need to consume data from Kafka and store it in Cassandra DB using Spring Batch. Let's say that the data is an object with a String id Primary key and int status code. The volume of messages is in millions of messages per minute. How do I efficiently and rapidly store the data into Cassandra DB?

Currently I am using Kafka Listener to consume data place it in a queue and poll data from ItemReader.

ConcurrentLinkedQueue<DeviceStatusTracker> dataFromKafka = new ConcurrentLinkedQueue<>();

Kafka Deserializer

@Component
public class MessageDeserializer implements Deserializer<Message> {

    ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();

    public DeviceStatusTrackerDeserializer() {
    }

    @Override
    public void configure(java.util.Map<java.lang.String,?> configs, boolean isKey) {
        // No additional configuration required
    }

    @Override
    public Message deserialize(java.lang.String topic, byte[] data) {
        try {
            if (data == null) {
                return null;
            } else {
                return objectMapper.readValue(data, Message.class);
            }
        } catch (Exception e) {
            throw new org.apache.kafka.common.errors.SerializationException("Error serializing Message", e);
        }
    }
}

I have noticed that increasing the concurrency increases the time to complete the step. The topic has 5 partitions with 2 replicas.

@KafkaListener(
        id = "topic-kafka-listener",
        groupId = "topic-batch",
        containerFactory = "kafkaListenerContainerFactory",
        concurrency = "1",
        topics = "topic"
)
public void receive(@NotNull @Payload List<Message> messages) {
    dataFromKafka.addAll(messages);
}
@StepScope
public ItemReader<Message> itemReader() {
    return () -> dataFromKafka.poll();
}

@StepScope
public ItemWriter<Message> itemWriter() {
    return (chunk) -> messageRepository.saveAll(chunk.getItems());
}
public TaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(8);
    threadPoolTaskExecutor.setMaxPoolSize(16);
    threadPoolTaskExecutor.setQueueCapacity(100);
    threadPoolTaskExecutor.setPrestartAllCoreThreads(true);
    threadPoolTaskExecutor.initialize();
    return threadPoolTaskExecutor;
}

@Bean
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder("job", jobRepository)
         .start(step1(jobRepository, transactionManager))
         .build();
}


public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1", jobRepository)
            .<Message, Message>chunk(10000, transactionManager)
            .taskExecutor(threadPoolTaskExecutor())
            .reader(itemReader())
            .writer(itemWriter())
            .build();
}

I am using this to run the Job Periodically and to find out the resources used.

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Runnable runnable = () -> {
    try {
        JobParameters jobParameter = new JobParametersBuilder()
                    .addLocalTime("time", LocalDateTime.now().atZone(ZoneId.systemDefault()).toLocalTime())
                    .toJobParameters();

        jobLauncher.run(job, jobParameter);

        double cpuUsage = osBean.getProcessCpuLoad() * 100;
        double usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed() / 1048576.0;

        logger.info("CPU Usage: " + cpuUsage + " %");
        logger.info("Used Memory: " + usedMemory + " MB");
   } catch (Exception e) {
        e.printStackTrace();
   }
};

executorService.scheduleWithFixedDelay(runnable, 0, 5, TimeUnit.SECONDS);

I have also tried to pause the Kafka listener, split data into batches and place them into the queue and resume Kafka listener. The item reader then polls for a list of messages instead of polling one by one. But this resulted in dropped messages.

Hardware info:

  • Ryzen 7 4800 H Octa Core
  • 16GB RAM
  • SSD

Currently this code takes 2 minutes to store 1 million messages into cassandra. I suspect the choke point is the queue. What is the best way to split the data and give it to the threads instead of polling one by one? What other methods should I try to make it faster?


Solution

  • I have noticed that increasing the concurrency increases the time to complete the step Ah yeah, your hardware probably isn't powerful enough to handle the current set up you have...

    There are lots of moving parts here, so I will just throw a couple of thoughts at you to see if anything might help!


    Your chunk size is 10000, have you experimented with higher or lower values to see what kind of affect his has?


    You may be able to leverage https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

    And use this for your item writing code. Something like:

    public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("step1", jobRepository)
                .<Message, Message>chunk(10000, transactionManager)
                .taskExecutor(threadPoolTaskExecutor())
                .reader(itemReader())
                .writer((chunk) -> itemWriter(chunk))
                .build();
    }

    public CompletableFuture<Void> itemWriter(List<Message> chunk) {
        return CompletableFuture.runAsync(() -> messageRepository.saveAll(chunk.getItems()));
    }


    Similar to the above, maybe leverage streams if the version of Java you are using supports it: https://www.baeldung.com/java-when-to-use-parallel-stream


    Hoping these ideas spur something that'll help!