Search code examples
javaconcurrencyarchitectureapache-camelenterprise-integration

EIP/Apache Camel - How to process message concurrently, but atomically per group?


I have the following situation:

  • There are a fixed number of groups.
  • There is a TCP stream of incoming messages. Each message is related to exactly one group.

I start the Camel route as following:

public class MyMessage implements Runnable {
    public void run() {
        // omitted here
    }
}

from("netty:tcp://localhost:7777?textline=true&sync=false")
   ... // omitted here: parse message to pojo MyMessage, set header "group-identifier"
   .to(seda:process);

This Camel route consumes the TCP stream, parses and converts the payload of each incoming message to a MyMessage pojo and sets the group-identifier header on the exchange which corresponds with a message...

Now I want to consume seda:process as following:

  • Messages belonging the same group cannot be executed concurrently.
  • Messages belonging to different groups can be executed concurrently.
  • Each messages should be executed by calling run(). I want to provide/define an ExecutorService for this, so I can control the number of threads.

Which enterprise integration patterns can I apply here? How can I map these concepts to Camel?

I learnt that ActiveMQ has the concept of message groups (http://activemq.apache.org/message-groups.html). This might provide a way to make sure that two messages of the same group will never be executed at the same time. Though, I am not sure that introducing ActiveMQ only for this isn't overkill. Can this also be achieved with 'core' Camel/Java?


Solution

  • It is quite easy to do this in ActiveMQ. The following code snippet simulates executing messages as required:

    • Messages belonging to the same group are executed sequentially.
    • Messages belong to different groups are executed concurrently.

    This relies on ActiveMQ message groups as explained on http://activemq.apache.org/message-groups.html.

    final CamelContext context = new DefaultCamelContext();
    
    context.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"));
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("activemq:queue:q?concurrentConsumers=5")
                    .process(exchange -> {
                        System.out.println(Thread.currentThread() + " - " + exchange.getIn().getBody());
                        Thread.sleep(5000);
                    });
        }
    });
    context.start();
    
    for (int i = 0; i < 1000; ++i) {
        context.createFluentProducerTemplate()
                .withBody("This is a message from group : " + (i % 5))
                .withHeader("JMSXGroupID", "" + (i % 5))
                .to("activemq:queue:q")
                .send();
    }
    

    That said, I am (still) wondering if this could be done with pure EIPs/Camel-core.