I need to consume data from Kafka and store it in Cassandra DB using Spring Batch. Let's say that the data is an object with a String id Primary key and int status code. The volume of messages is in millions of messages per minute. How do I efficiently and rapidly store the data into Cassandra DB?
Currently I am using Kafka Listener to consume data place it in a queue and poll data from ItemReader.
ConcurrentLinkedQueue<DeviceStatusTracker> dataFromKafka = new ConcurrentLinkedQueue<>();
Kafka Deserializer
@Component
public class MessageDeserializer implements Deserializer<Message> {
ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
public DeviceStatusTrackerDeserializer() {
}
@Override
public void configure(java.util.Map<java.lang.String,?> configs, boolean isKey) {
// No additional configuration required
}
@Override
public Message deserialize(java.lang.String topic, byte[] data) {
try {
if (data == null) {
return null;
} else {
return objectMapper.readValue(data, Message.class);
}
} catch (Exception e) {
throw new org.apache.kafka.common.errors.SerializationException("Error serializing Message", e);
}
}
}
I have noticed that increasing the concurrency increases the time to complete the step. The topic has 5 partitions with 2 replicas.
@KafkaListener(
id = "topic-kafka-listener",
groupId = "topic-batch",
containerFactory = "kafkaListenerContainerFactory",
concurrency = "1",
topics = "topic"
)
public void receive(@NotNull @Payload List<Message> messages) {
dataFromKafka.addAll(messages);
}
@StepScope
public ItemReader<Message> itemReader() {
return () -> dataFromKafka.poll();
}
@StepScope
public ItemWriter<Message> itemWriter() {
return (chunk) -> messageRepository.saveAll(chunk.getItems());
}
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(8);
threadPoolTaskExecutor.setMaxPoolSize(16);
threadPoolTaskExecutor.setQueueCapacity(100);
threadPoolTaskExecutor.setPrestartAllCoreThreads(true);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Bean
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("job", jobRepository)
.start(step1(jobRepository, transactionManager))
.build();
}
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Message, Message>chunk(10000, transactionManager)
.taskExecutor(threadPoolTaskExecutor())
.reader(itemReader())
.writer(itemWriter())
.build();
}
I am using this to run the Job Periodically and to find out the resources used.
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Runnable runnable = () -> {
try {
JobParameters jobParameter = new JobParametersBuilder()
.addLocalTime("time", LocalDateTime.now().atZone(ZoneId.systemDefault()).toLocalTime())
.toJobParameters();
jobLauncher.run(job, jobParameter);
double cpuUsage = osBean.getProcessCpuLoad() * 100;
double usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed() / 1048576.0;
logger.info("CPU Usage: " + cpuUsage + " %");
logger.info("Used Memory: " + usedMemory + " MB");
} catch (Exception e) {
e.printStackTrace();
}
};
executorService.scheduleWithFixedDelay(runnable, 0, 5, TimeUnit.SECONDS);
I have also tried to pause the Kafka listener, split data into batches and place them into the queue and resume Kafka listener. The item reader then polls for a list of messages instead of polling one by one. But this resulted in dropped messages.
Hardware info:
Currently this code takes 2 minutes to store 1 million messages into cassandra. I suspect the choke point is the queue. What is the best way to split the data and give it to the threads instead of polling one by one? What other methods should I try to make it faster?
I have noticed that increasing the concurrency increases the time to complete the step
Ah yeah, your hardware probably isn't powerful enough to handle the current set up you have...
There are lots of moving parts here, so I will just throw a couple of thoughts at you to see if anything might help!
Your chunk size is 10000, have you experimented with higher or lower values to see what kind of affect his has?
You may be able to leverage https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
And use this for your item writing code. Something like:
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Message, Message>chunk(10000, transactionManager)
.taskExecutor(threadPoolTaskExecutor())
.reader(itemReader())
.writer((chunk) -> itemWriter(chunk))
.build();
}
public CompletableFuture<Void> itemWriter(List<Message> chunk) {
return CompletableFuture.runAsync(() -> messageRepository.saveAll(chunk.getItems()));
}
Similar to the above, maybe leverage streams if the version of Java you are using supports it: https://www.baeldung.com/java-when-to-use-parallel-stream
Hoping these ideas spur something that'll help!