Context: I coded a Kafka Consumer which receives a simple message and I want to insert it to MongoDb using com.mongodb.reactivestreams.client.MongoClient. Althought I understand my issue is all about how use properly MongoClient let me inform my stack: my stack is Micronaut + MongoDb reactive + Kotlin.
Disclaimer: if someone provide answer in java I may be able to translate it to Kotlin. You can ignore the Kafka part bellow since it is working as expected.
Here is my code
package com.mybank.consumer
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import com.mongodb.reactivestreams.client.MongoDatabase
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import org.bson.Document
import org.reactivestreams.Publisher
import javax.inject.Inject
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {
@Inject
//@Named("another")
var mongoClient: MongoClient? = null
@Topic("debit")
fun receive(@KafkaKey key: String, name: String) {
println("Account - $name by $key")
var mongoDb : MongoDatabase? = mongoClient?.getDatabase("account")
var mongoCollection: MongoCollection<Document>? = mongoDb?.getCollection("account_collection")
var mongoDocument: Publisher<Document>? = mongoCollection?.find()?.first()
print(mongoDocument.toString())
//println(mongoClient?.getDatabase("account")?.getCollection("account_collection")?.find()?.first())
//val mongoClientClient: MongoDatabase = mongoClient.getDatabase("account")
//println(mongoClient.getDatabase("account").getCollection("account_collection").find({ "size.h": { $lt: 15 } })
//println(mongoClient.getDatabase("account").getCollection("account_collection").find("1").toString())
}
}
Well, the code above was the closest I got. It is not prompting any error. It is printing
com.mongodb.reactivestreams.client.internal.Publishers$$Lambda$618/0x0000000800525840@437ec11
I guess this prove the code is connecting properly to database but I was expecting to print the first document.
There are three documents:
My final goal is to insert the message I have received from Kafka Listener to MongoDb. Any clue will be appreciated.
The whole code can be found in git hub
*** edited after Susan's question
Here is what is printed with
var mongoDocument = mongoCollection?.find()?.first()
print(mongoDocument.toString())
Looks like you are using reactive streams for mongodb. Is there a reason you are using reactive streams?
The result you are getting is of type "Publisher". You will need to use the method subscribe(), in order to get the document.
See the documentation on Publisher.
http://www.howsoftworks.net/reacstre/1.0.2/Publisher
If you dont want to use reactive: Great example on how/what to use for mongodb in Kotlin.
https://kb.objectrocket.com/mongo-db/retrieve-mongodb-document-using-kotlin-1180
--- Similar StackOverlow using MongoDB, Reactive Streams, Publisher.
how save document to MongoDb with com.mongodb.reactivestreams.client
=============== Edited ==============
Publisher<Document> publisher = collection.find().first();
subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber); //publisher.subscribe(subscriber)
subscriber.await();
The example will print the following document:
{ "_id" : { "$oid" : "551582c558c7b4fbacf16735" },
"name" : "MongoDB", "type" : "database", "count" : 1,
}
If you want nonblocking, do it this way:
publisher.subscribe(new PrintDocumentSubscriber()); //without await
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.6/getting-started/quick-tour/