Search code examples
javamongodbspring-bootspring-integrationspring-integration-dsl

MongoDB Inbound Adapter updateExpression and Transaction sync DSL example


Similar to this question from 5 years ago, I'm trying to define a mongodb source that will mark a record as PROCESSING when polled. When the flow is done, I want to mark the record DONE. A lot has changed since then, namely the addition of the update() expression on the polling message source. I'm looking for an example of this in DSL.

I read through these docs:

Most of the examples in this documentation (and pretty much everywhere else in the spring docs) is in the XML form. I have not used XML to configure Spring since Obama was President. I am looking for a DSL example of an IntegrationFlow that would poll a mongo inbound adapter (which marks it PROCESSING), handles it, and then marks it DONE.

The inbound adapter's update expression would likely be the thing that sets it to PROCESSING. Once it's marked PROCESSING, it's pretty much safe from being picked up by any other poller.

Would an additional handler (outbound adapter?) be all that's needed to mark it DONE?

Something like:

IntegrationFlow.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
                                 .update("{'status' : 'PROCESSING'}"))
               .handle(//do something with it)
               .handle(//update to DONE)

I'm not quite sure if I would have to manually new up a MongoDbStoringMessageHandler to update the record, or if I should just manually do it on the mongoTemplate. The MongoDb helper does not include anything for building an outbound adapter. I double checked what the documentation had and, wouldn't you know, it only has the XML form. My only other thought from looking at it was that the outbound adapter is only meant for taking outbound messages and inserting them as new records.

I'm also not sure what I gain by using the inbound adapter over a MessageSource instance that just does mongoTemplate.findAndModify()

The documentation goes on to muddy the waters further by saying "yeah you can use an updateExpression but you probably still want to use transactions anyway". So does the update() not get me what I need? I took a look at what it would take to add transaction sync to my flow in the migraine and tinnitus inducing documentation on Transaction Synchronization. All the examples are XML.

This will likely not run on a single thread in a production environment for performance and scalability reasons.

Here's my attempt at putting together something using the outboundGateway with a callback:

IntegrationFlow
            .from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}").update("{'status': 'PROCESSING'}"), 
                    p -> p.poller(pm -> pm.fixedDelay(1000L)))
            .handle(MongoDb.outboundGateway(mongoTemplate)
                    .collectionCallback((c, m) -> 
                        c.findOneAndUpdate(Filters.eq("_id", ((Payload) m.getPayload()).getId()), 
                            Updates.set("status", "DONE"))))
            .get();

Solution

  • Yes. I agree that docs and respective Java DSL still needs some love to, at least, realign it with whatever we have there for MongoDB Outbound Gateway.

    As we said that clearly in the docs, applying transaction synchronization doesn't make MongoDB transactional per se. Technically what we say there about an update is fully the same what you would do (but more complex) with transaction synchronization.

    If you do everything in a single polling thread, then you might not need that extra DONE step, but rather have an update to this DONE directly. In this case the next polling cycle won't start until you done with the current.

    Instead of direct MongoDbTemplate API, you may consider to use a MongoDbOutboundGateway with its collectionCallback feature where, well, you would use MongoCollection.findOneAndUpdate().

    We are just short-handed here to do everything for you. On the other hand, you know, this is an Open Source, so feel free to come back to us with any contribution you find reasonable.