i'm writing a service using Java, i'm learning so it is for study purpose. I wrote my rest app and it's working fine, then i added kafka that's sending a message to a consumer every time i do a request through insomnia and it's working. Now the problem is that i have to store the message from the consumer to Open Search running inside Docker, (only using Java, without Spring Boot ecc...) is there a way to do it? I can't find anything.
That's my Consumer:
String servers = "localhost:9092";
String groupId = "id";
String topic = "mytopic";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList(topic));
Logger logger = LoggerFactory.getLogger(Main.class.getName());
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key:" + record.key() + ", Value:" + record.value());
logger.info("Partition:" + record.partition() + ", Offset:" + record.offset());
}
}
And that's my producer:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", null, message);
kafkaProducer.send(record);
kafkaProducer.flush();
kafkaProducer.close();
EDIT: I forgot to tell that i already have Open Search up on Docker and already created an index, i have to insert the message i'm going mad..
There exists a Kafka Connector for OpenSearch. You do not need to write code for this (nor should you).
https://github.com/aiven/opensearch-connector-for-apache-kafka
Kafka Connect is a Java Framework...
Regarding Docker, see https://docs.confluent.io/platform/current/connect/extending.html
Otherwise, Opensearch has a REST API, so you'd write a Kafka Consumer process to write data to it rather than just printing to the console