1

Context: I coded a Kafka Consumer which receives a simple message and I want to insert it to MongoDb using com.mongodb.reactivestreams.client.MongoClient. Althought I understand my issue is all about how use properly MongoClient let me inform my stack: my stack is Micronaut + MongoDb reactive + Kotlin.

Disclaimer: if someone provide answer in java I may be able to translate it to Kotlin. You can ignore the Kafka part bellow since it is working as expected.

Here is my code

package com.mybank.consumer

import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import com.mongodb.reactivestreams.client.MongoDatabase
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import org.bson.Document
import org.reactivestreams.Publisher
import javax.inject.Inject


@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {

    @Inject
    //@Named("another")
    var mongoClient: MongoClient? = null


    @Topic("debit")
    fun receive(@KafkaKey key: String, name: String) {


        println("Account - $name by $key")

        
        var mongoDb : MongoDatabase? = mongoClient?.getDatabase("account")
        var mongoCollection: MongoCollection<Document>? = mongoDb?.getCollection("account_collection")
        var mongoDocument: Publisher<Document>? = mongoCollection?.find()?.first()
        print(mongoDocument.toString())

        //println(mongoClient?.getDatabase("account")?.getCollection("account_collection")?.find()?.first())
        //val mongoClientClient: MongoDatabase  = mongoClient.getDatabase("account")
        //println(mongoClient.getDatabase("account").getCollection("account_collection").find({ "size.h": { $lt: 15 } })
        //println(mongoClient.getDatabase("account").getCollection("account_collection").find("1").toString())


    }
}

Well, the code above was the closest I got. It is not prompting any error. It is printing

com.mongodb.reactivestreams.client.internal.Publishers$$Lambda$618/0x0000000800525840@437ec11

I guess this prove the code is connecting properly to database but I was expecting to print the first document.

There are three documents:

mongodb-express view

My final goal is to insert the message I have received from Kafka Listener to MongoDb. Any clue will be appreciated.

The whole code can be found in git hub

*** edited after Susan's question

Here is what is printed with

var mongoDocument = mongoCollection?.find()?.first()
print(mongoDocument.toString())

print result

3
  • 1
    "but I was expecting to print the first document"- I think your code is written to print the publisher, not the document. Commented Nov 16, 2020 at 16:24
  • 1
    what does this print? var mongoDocument = mongoCollection?.find()?.first() Commented Nov 24, 2020 at 21:52
  • @SusanMustafa I added above the print Commented Nov 25, 2020 at 1:20

1 Answer 1

1

Looks like you are using reactive streams for mongodb. Is there a reason you are using reactive streams?

The result you are getting is of type "Publisher". You will need to use the method subscribe(), in order to get the document.

See the documentation on Publisher.

http://www.howsoftworks.net/reacstre/1.0.2/Publisher

If you dont want to use reactive: Great example on how/what to use for mongodb in Kotlin.

https://kb.objectrocket.com/mongo-db/retrieve-mongodb-document-using-kotlin-1180

--- Similar StackOverlow using MongoDB, Reactive Streams, Publisher.

how save document to MongoDb with com.mongodb.reactivestreams.client

=============== Edited ==============

Publisher<Document> publisher = collection.find().first();

subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber); //publisher.subscribe(subscriber)
subscriber.await();

The example will print the following document:

{ "_id" : { "$oid" : "551582c558c7b4fbacf16735" },
  "name" : "MongoDB", "type" : "database", "count" : 1,
}

If you want nonblocking, do it this way:

publisher.subscribe(new PrintDocumentSubscriber());  //without await

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.4/javadoc/tour/SubscriberHelpers.PrintDocumentSubscriber.html

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.6/getting-started/quick-tour/

Sign up to request clarification or add additional context in comments.

6 Comments

Yes, there is a reason. I want to build a Reactive Stack that is no blocking from end to end relying on Micronaut. Just as analogy, supposing I was using Spring WebFlux/Netty, I would use spring-boot-starter-data-mongodb-reactive. Actually, I have coded two solution this year based on Spring WebFlux/Spring Data Reactive using ElasticSearch. I am far away to be an expert in Reactive pradigma but, as far as I understand, we lose benefits from reactive design if driver to database is blocking. Since I want use Micronaut and avoid Spring I am trying com.mongodb.reactivestreams.client.MongoClient
@JimC you know more than me on reactive.streams :), I hope you found the links of some help. You definitly need to subscribe to your publisher.
If you can edit your answer adding a suggestion how to subscribe to my publisher I would be gratefull. Using Spring Data it seems easier probably because it hides some specific Reactive characteristic and make it seems like a common CRUD (discuss why I don't simply jump back to Spring is beyond this question)
done @JimC The mongodb github documentation is very helpful. Read it slowly, and I hope everything works out for you
@JimC sorry I actually didn’t. I hope if everything worked out, you would put an answer to help future devs
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.