I'm trying to pull object from MongoDb and ADD it to my current payload and save it in another database:
@Override
public void configure() throws Exception
{
from(kafkaEndpoint)
.convertBodyTo(DBObject.class)
.enrich("mongodb:mongoDb?database=myDbName1&collection=UserColl&operation=findOneByQuery",
(original, external) -> {
DBObject originalBody = original.getIn().getBody(DBObject.class);
DBObject externalBody = external.getIn().getBody(DBObject.class);
Map<String, DBObject> map = new HashMap<String, DBObject>();
map.put("original", originalBody);
map.put("external", externalBody);
original.getIn().setBody(map);
return original;
})
.to("mongodb:mongoDb?database=myDbName2&collection=UserColl&operation=insert");
}
The problem that enrich fetch the query from the In.body that holds my original object...
So how can I pass query ({"entity.id": ""}) to enrich(mongoldb:...) and preserve original object to merge it with results?
Thanks.
@Override
public void configure() throws Exception
{
from(kafkaEndpoint)
.convertBodyTo(DBObject.class)
.enrich("direct:findOneByQuery", // <-------
(original, external) -> {
DBObject originalBody = original.getIn().getBody(DBObject.class);
DBObject externalBody = external.getIn().getBody(DBObject.class);
Map<String, DBObject> map = new HashMap<String, DBObject>();
map.put("original", originalBody);
map.put("external", externalBody);
original.getIn().setBody(map);
return original;
})
.to("mongodb:mongoDb?database=myDbName2&collection=UserColl&operation=insert");
}
from("direct:findOneByQuery")
.process(new Processor()
{
@Override
public void process(Exchange exchange) throws Exception
{
DBObject body = exchange.getIn().getBody(DBObject.class);
DBObject query = BasicDBObjectBuilder.start()
.append("entity._id", body.get("_id"))
.get();
exchange.getIn().setBody(query);
}
})
.to("mongodb:mongoDb?database=myDbName1&collection=UserColl&operation=findOneByQuery");
//