Search code examples
javaapache-camel

Apache camel route, want to forward value


I have this apache camel route, I instantiate a bean and do some transformation on the incoming string but when i send it to the .to(Endpoints.SEDA_SEND_PRICING_LIFE_CYCLE_MESSAGE) it doesn't take the transformed value. It sends the original one received at the beginning of the route. in my .bean class I am returning an object value. ( have tried returning a string value as well )

 from(azureServicebus(AZvalue)
                       .connectionString(connectionString)
                       .receiverAsyncClient(serviceBusReceiverAsyncClient)
                       .serviceBusReceiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                       .serviceBusType( ServiceBusType.topic)
                       .prefetchCount(100)
                       .consumerOperation ( ServiceBusConsumerOperationDefinition.receiveMessages )

                       //.maxAutoLockRenewDuration(Duration.ofMinutes(10))
                         )
                       .messageHistory()
                       // Route Name
                       .routeId(Endpoints.SEDA_PROCESS_SB_MESSAGE_ENDPOINT )
                       // multicast may not need
                       .multicast()
                       .parallelProcessing() // create parallel threads
                      .bean(PricingLifeCyclebService.class, "paraMap" ) 
                      .log("Final :- ${body}")

                      .to(Endpoints.SEDA_SEND_PRICING_LIFE_CYCLE_MESSAGE)

                       .end();
 }

for reference below is my bean class

package com.sams.pricing.lifecycle.processor.services;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.sams.pricing.lifecycle.processor.dtos.LifecycleDTO;
import com.sams.pricing.lifecycle.processor.mapper.LifecycleDtoMapper;
import com.sams.pricing.lifecycle.processor.model.LifeCycleTopicC;
import com.sams.pricing.lifecycle.processor.queueDtos.ParaDTO;
import com.sams.pricing.lifecycle.processor.util.LifeCycleMapper;
import com.sams.pricing.lifecycle.processor.util.LifeCycleUtility;

import lombok.extern.slf4j.Slf4j;

@Slf4j 
@Component
public class PricingLifeCyclebService {

    @Autowired private LifecycleDtoMapper paraToLifecleDTOMapper;
    
    @Autowired private LifeCycleMapper lifecycleTopicmapper ;
    
    public LifeCycleTopicC paraMap(String object) throws  JsonProcessingException  
    { 
        final var paraMapper =   LifeCycleUtility.getObjectMapper();
        final var paraDTO = paraMapper.readValue(object, ParaDTO.class);
        
        LifecycleDTO lifecycleDTO = paraToLifecleDTOMapper.mapToLifecycleDTO(paraDTO);
        
        LifeCycleTopicC obj = lifecycleTopicmapper.toLifeCycleTopicC(lifecycleDTO);
        
        
        log.info("Event={}, Body={}", "ConvertParatoTopic", obj);
        return obj;
        
    }

}

Solution

  • This is because you're transforming the body within multicast block when you should be transforming the body before multicast block.

    In below example the same message received by direct:example route gets sent simultaneously to endpoint 1, endpoint 2 and endpoint 3.

    from("direct:example")
        .routeId("example")
        .multicast().parallelProcessing()
            .bean(HelloBean.class, "greet" ) // endpoint 1
            .log("Final :- ${body}") // endpoint 2
            .to("direct:printBody") // endpoint 3
        .end();
    

    Here's how you should use multicast instead. This transforms the message using bean and sends the transformed message to endpoints direct:multicastA, direct:multicastB and direct:multicastC simultaneously.

    from("direct:example3")
        .routeId("example3")
        .bean(HelloBean.class, "greet" ) 
        .log("Final :- ${body}") 
        .multicast().parallelProcessing()
            .to("direct:multicastA") 
            .to("direct:multicastB") 
            .to("direct:multicastC") 
        .end();
    
    from("direct:multicastA")
        .delay(1000)
        .log("A: ${body}")
    ;
    
    from("direct:multicastB")
        .delay(1000)
        .log("A: ${body}")
    ;
    
    from("direct:multicastC")
        .delay(1000)
        .log("A: ${body}")
    ;
    

    Multicast is alternative to the default pipeline behavior. The difference being that pipeline calls endpoints in a sequence and multicast sends the same message to all listed endpoints (.Bean and .log are both endpoints just like direct, seda and timer).