Search code examples
asynchronousapache-camelshutdown

How to quickly stop seda in camel


I have a camel route with a splitter (using streaming) that sends messages to a seda queue to be processed. When I'm trying to stop the application gently, the seda queue doesn't stop immediately, it is processing all the messages before finally shutting down. What can I do to stop it right away?

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;

public class MySedaShutdownTest extends RouteBuilder  {

@Override
public void configure() throws Exception {
    onException(Exception.class)
        .process(new Processor() {

            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println("exception");
            }

        });

    from("timer:myTimer?repeatCount=1")
        .split(ExpressionBuilder.beanExpression(new MySplitter(), "myIterator"))
                .streaming()
                .to("seda:mySeda");

    from("seda:mySeda")
        .throttle(1)
        .process(new Processor() {

            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println("processing: " + exchange.getIn().getBody()
                        + "; app status: " + exchange.getContext().getStatus());

            }

        });
}

public static class MySplitter {

    public Iterator<String> myIterator() {
        List<String> values = new ArrayList<String>();
        for (int i = 0; i < 10; i++) {
            values.add("string nr : " + i);
        }
        System.out.println("in myIterator");
        return values.iterator();
    }
}

public static void main(String[] a) throws Exception {
    final Main main = new Main();
    new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                System.out.println("invoking shutdown");
                main.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }).start();
    System.out.println("starting app");
    main.enableHangupSupport();
    main.addRouteBuilder(new MySedaShutdownTest());
    main.run();
}

}


Solution

  • There is a purgeQueue method on the SedaEndpoint. So you can get the endpoint and call this method. You can also access it from JMX.

    A bit related we have this ticket for improvement

    And I logged a ticket for this