Search code examples
javaspringrabbitmqflumespring-rabbit

Using Spring AMQP onMessage() method within Flume Lifecycle


I need to listen to a Rabbit Queue in the Flume Custom Source which I have developed.This requirement may seem awkward in Flume.But this is how its needed. As I am using Spring AMQP to listen to the queue for simplicity,I am just not able to understand how to invoke the OnMessage() method within the Flume lifecycle Start() method,So that the messages can be posted onto the Flume channel. I have looked at the Spring MessageListenerAdapter concept but I have not been able to find any example to implement the same.


Solution

  • onMessage() is a part of MessageListener pattern. It is some active component, which is initiated by the external system (from big height). And it works each time by that remote command, so you can't use it as a passive componet to be initiated by the user call.

    Since you have "Flume lifecycle Start()" from other side and SimpleMessageListenerContainer has the same from its side, I'd say you have to correlate their lifecycles to work in tandem.

    From here you should to provide for the SimpleMessageListenerContainer some inline MessageListener implementation, which invokes the desired method to "post onto the Flume channel".

    HTH

    UPDATE

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    ....
    container.setMessageListener(new MessageListener() {
    
       public void onMessage(Message message) {
           sendMessageToFlumeChannel(message);
       }
    
    });
    

    Where the sendMessageToFlumeChannel is a method of the holding class.

    Of course it can be any POJO instead of MessageListener implementation, but the main goal to delegate listener resul to some method.