Search code examples
javaapache-kafkakafka-consumer-apikafka-producer-apiopensearch

Is it possible to send messages from Kafka to Open Search?


i'm writing a service using Java, i'm learning so it is for study purpose. I wrote my rest app and it's working fine, then i added kafka that's sending a message to a consumer every time i do a request through insomnia and it's working. Now the problem is that i have to store the message from the consumer to Open Search running inside Docker, (only using Java, without Spring Boot ecc...) is there a way to do it? I can't find anything.

That's my Consumer:

String servers = "localhost:9092";
    String groupId = "id";
    String topic = "mytopic";

    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Arrays.asList(topic));

    Logger logger = LoggerFactory.getLogger(Main.class.getName());

    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
        for (ConsumerRecord<String, String> record : records) {
            logger.info("Key:" + record.key() + ", Value:" + record.value());
            logger.info("Partition:" + record.partition() + ", Offset:" + record.offset());
        }
    }

And that's my producer:

Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", null, message);
    kafkaProducer.send(record);
    kafkaProducer.flush();
    kafkaProducer.close();

EDIT: I forgot to tell that i already have Open Search up on Docker and already created an index, i have to insert the message i'm going mad..


Solution

  • There exists a Kafka Connector for OpenSearch. You do not need to write code for this (nor should you).

    https://github.com/aiven/opensearch-connector-for-apache-kafka

    Kafka Connect is a Java Framework...

    Regarding Docker, see https://docs.confluent.io/platform/current/connect/extending.html

    Otherwise, Opensearch has a REST API, so you'd write a Kafka Consumer process to write data to it rather than just printing to the console