Search code examples
messagecamundabuffering

Message Buffering Camunda rabbitmq spring-boot


i have a setup of 3 spring-boot projects each acting as a separate participant, each project deploys one bpmn model communicating with each other in a loop based pattern.

Although, i am using a message broker as a queue i do not find a way to correlate messages to camunda where the receive task is not ready yet. So, e.g process 1 sends a message to process 2 but process 2 is not yet on receive task so i encounter a MismatchingCorrelationError which tells me that the sended message was too fast and the corresponding receive task was not ready yet.

I thought of a message queue so i decided to use rabbitmq broker but the java setup simply takes in a message to the queue and it sends is right away to camunda but i do not want that it gets sended to camunda, instead i would like to only send the queued messages when i am on a receive task.

So i could define a start listener on each receive task which will call java delegate class somehow calling rabbitmq queue that i am now ready and then queue will send the next message right away but not before. Does anybody have a clou to solve this?

In the configuration class i needed to set up a Bean with MessageListenerAdapter: `

@Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

`

The Receiver class:

@Component
public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    @Autowired
    CamundaMessageProcessor messageProcessor;

    public void receiveMessage(String message) {
        try {
            Response response = messageProcessor.processMessage(message);
        }catch(Exception e){
            // @TODO: Fix up error catching with the service
            
        }finally{
            latch.countDown();
        }

    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

And the MessageProcessor class:

@Service
public class CamundaMessageProcessor{

    @Autowired
    private ProcessEngine engine;

    @Autowired
    private ObjectMapper objectMapper;

    public Response processMessage(String message){
        CorrelationMessageDto messageDto = null;
        try {
            System.out.println("MESSAGE: " + message);
            messageDto = objectMapper.readValue(message, CorrelationMessageDto.class);
            
        } catch (JsonMappingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (JsonProcessingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        MessageRestServiceImpl service = new MessageRestServiceImpl(engine.getName(), objectMapper);
        Response response = service.deliverMessage(messageDto);
        System.out.println("Response deliverMessage " + response.toString());
        return response;
    }
}

These are my sample github project: https://github.com/SebastianAT/camunda_springboot_rabbitmq

Somehow i expect that camunda will buffer my message as long as i am not on a receive task but it seems that this won't work with camunda 7.

Thanks in advance. Best Regards Sebastian


Solution

  • Camunda 8 can buffer your message, please see: https://docs.camunda.io/docs/components/concepts/messages/

    If you want to call out when ready, then you don't need messaging. Telling a queue when a receiver is ready is not how such architectures normally work. If you can call a service when the stage is reached, why not let the one process write to a DB and the other process fetch the information when it needs it?

    With a message broker, you would normally attempt a delivery and if needed perform a retry after a certain delay, and potentially another retries after a potentially higher delay. Found this example for RabbitMQ https://github.com/fefferoni/rabbitmq-delayed-retry-experiment/ I cannot comment on the quality of the example, but the design/diagram looks reasonable. RabbitMq also seems to support delayed queues.