Search code examples
mongodbapache-camelenterprise-integration

How to enrich the payload with an object from MongoDB (camel-mongodb)


I'm trying to pull object from MongoDb and ADD it to my current payload and save it in another database:

@Override
public void configure() throws Exception
{
    from(kafkaEndpoint)
            .convertBodyTo(DBObject.class)
            .enrich("mongodb:mongoDb?database=myDbName1&collection=UserColl&operation=findOneByQuery",
                    (original, external) -> {
                        DBObject originalBody = original.getIn().getBody(DBObject.class);
                        DBObject externalBody = external.getIn().getBody(DBObject.class);

                        Map<String, DBObject> map = new HashMap<String, DBObject>();
                        map.put("original", originalBody);
                        map.put("external", externalBody);

                        original.getIn().setBody(map);
                        return original;
                    })
            .to("mongodb:mongoDb?database=myDbName2&collection=UserColl&operation=insert");
}

The problem that enrich fetch the query from the In.body that holds my original object...

So how can I pass query ({"entity.id": ""}) to enrich(mongoldb:...) and preserve original object to merge it with results?

Thanks.


Solution

  •     @Override
        public void configure() throws Exception
        {
            from(kafkaEndpoint)
                    .convertBodyTo(DBObject.class)
                    .enrich("direct:findOneByQuery",     // <-------
                            (original, external) -> {
                                DBObject originalBody = original.getIn().getBody(DBObject.class);
                                DBObject externalBody = external.getIn().getBody(DBObject.class);
    
                                Map<String, DBObject> map = new HashMap<String, DBObject>();
                                map.put("original", originalBody);
                                map.put("external", externalBody);
    
                                original.getIn().setBody(map);
                                return original;
                            })
                    .to("mongodb:mongoDb?database=myDbName2&collection=UserColl&operation=insert");
    
        }
    
        from("direct:findOneByQuery")
                .process(new Processor()
                {
                    @Override
                    public void process(Exchange exchange) throws Exception
                    {
                        DBObject body = exchange.getIn().getBody(DBObject.class);
                        DBObject query = BasicDBObjectBuilder.start()
                                .append("entity._id", body.get("_id"))
                                .get();
    
                        exchange.getIn().setBody(query);
                    }
                })
                .to("mongodb:mongoDb?database=myDbName1&collection=UserColl&operation=findOneByQuery");
    
     //