Search code examples
javaoracle-databaseapache-kafkakafka-consumer-apispring-kafka

Insert kafka consumer data to Oracle database with Java spring


I create a Java microservices with spring to consume message from kafka topics, and then insert the data to a database (Oracle). My service already works consumed data from kafka topics, but the problem is to send consumed data (JSON format) to the database (Oracle).

Here's my kafka topics:

kafka topics

Successfully consumed message:

consumed message

I already tried with Java prepared statement but doesn't work.

My java consumer class

package com.kafka.consumertllog.engine;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerEngine {
    @KafkaListener(id = "${consumer.client.id}", topics ="${kafka.consumer.clean.topic}", containerFactory="kafkaListenerContainerFactoryString")
    public void consume(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment){
        System.out.println("Consumed message : " + consumerRecord.value().toString());
        acknowledgment.acknowledge();
    }
}

My java kafka listener class

package com.kafka.consumertllog.factory;

// Importing required classes
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

@Configuration
@EnableKafka
public class KafkaListenerFactory {

    @Value("${kafka.consumer.broker}")
    private String bootstrapServers;

    @Value("${kafka.consumer.id.group}")
    private String idGroupConsumer;

    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${enable.auto.commit}")
    private String enableAutoCommit;

    @Value("${concurrent.consumer.kafka}")
    private int concurrentConsumer;
    
    /**
     *
     * This controller configuration consumer for listen String message and manual commit
     *
     */

    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactoryString")
    public ConsumerFactory<String, String> consumerFactoryString() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, idGroupConsumer);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
    }

    @Bean(name = "kafkaListenerContainerFactoryString")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryString() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryString());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(concurrentConsumer);
        return factory;
    }
}

Solution

  • It's not recommended to use a Kafka Consumer to write to a database. You have to handle batching, retries, parsing records to table columns, among other issues like knowing when to run an UPDATE or INSERT or DELETE query...

    All of this is handled for you when you use Kafka Connect JDBC sink.

    If you want to use Spring, then add Spring Data dependency for a start, don't use JDBC PreparedStatement directly, and don't write one event to the database at a time from KafkaListener, as that'll be slow for both the consumer and database client, not to mention cause unnecessary load on the database with frequent writes.

    Also, your wired Values are not correct. They should all start with spring.kafka. https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka