I have this apache camel route, I instantiate a bean and do some transformation on the incoming string but when i send it to the .to(Endpoints.SEDA_SEND_PRICING_LIFE_CYCLE_MESSAGE) it doesn't take the transformed value. It sends the original one received at the beginning of the route. in my .bean class I am returning an object value. ( have tried returning a string value as well )
from(azureServicebus(AZvalue)
.connectionString(connectionString)
.receiverAsyncClient(serviceBusReceiverAsyncClient)
.serviceBusReceiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.serviceBusType( ServiceBusType.topic)
.prefetchCount(100)
.consumerOperation ( ServiceBusConsumerOperationDefinition.receiveMessages )
//.maxAutoLockRenewDuration(Duration.ofMinutes(10))
)
.messageHistory()
// Route Name
.routeId(Endpoints.SEDA_PROCESS_SB_MESSAGE_ENDPOINT )
// multicast may not need
.multicast()
.parallelProcessing() // create parallel threads
.bean(PricingLifeCyclebService.class, "paraMap" )
.log("Final :- ${body}")
.to(Endpoints.SEDA_SEND_PRICING_LIFE_CYCLE_MESSAGE)
.end();
}
for reference below is my bean class
package com.sams.pricing.lifecycle.processor.services;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.sams.pricing.lifecycle.processor.dtos.LifecycleDTO;
import com.sams.pricing.lifecycle.processor.mapper.LifecycleDtoMapper;
import com.sams.pricing.lifecycle.processor.model.LifeCycleTopicC;
import com.sams.pricing.lifecycle.processor.queueDtos.ParaDTO;
import com.sams.pricing.lifecycle.processor.util.LifeCycleMapper;
import com.sams.pricing.lifecycle.processor.util.LifeCycleUtility;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class PricingLifeCyclebService {
@Autowired private LifecycleDtoMapper paraToLifecleDTOMapper;
@Autowired private LifeCycleMapper lifecycleTopicmapper ;
public LifeCycleTopicC paraMap(String object) throws JsonProcessingException
{
final var paraMapper = LifeCycleUtility.getObjectMapper();
final var paraDTO = paraMapper.readValue(object, ParaDTO.class);
LifecycleDTO lifecycleDTO = paraToLifecleDTOMapper.mapToLifecycleDTO(paraDTO);
LifeCycleTopicC obj = lifecycleTopicmapper.toLifeCycleTopicC(lifecycleDTO);
log.info("Event={}, Body={}", "ConvertParatoTopic", obj);
return obj;
}
}
This is because you're transforming the body within multicast block when you should be transforming the body before multicast block.
In below example the same message received by direct:example route gets sent simultaneously to endpoint 1, endpoint 2 and endpoint 3.
from("direct:example")
.routeId("example")
.multicast().parallelProcessing()
.bean(HelloBean.class, "greet" ) // endpoint 1
.log("Final :- ${body}") // endpoint 2
.to("direct:printBody") // endpoint 3
.end();
Here's how you should use multicast instead. This transforms the message using bean and sends the transformed message to endpoints direct:multicastA
, direct:multicastB
and direct:multicastC
simultaneously.
from("direct:example3")
.routeId("example3")
.bean(HelloBean.class, "greet" )
.log("Final :- ${body}")
.multicast().parallelProcessing()
.to("direct:multicastA")
.to("direct:multicastB")
.to("direct:multicastC")
.end();
from("direct:multicastA")
.delay(1000)
.log("A: ${body}")
;
from("direct:multicastB")
.delay(1000)
.log("A: ${body}")
;
from("direct:multicastC")
.delay(1000)
.log("A: ${body}")
;
Multicast is alternative to the default pipeline behavior. The difference being that pipeline calls endpoints in a sequence and multicast sends the same message to all listed endpoints (.Bean
and .log
are both endpoints just like direct, seda and timer).