Search code examples
javarabbitmqspring-cloud-dataflow

How to avoid Spring Cloud Dataflow Source application to keep returning same message


I'm suscribing to a Queue on rabbitMQ server and consuming the messages on it to retrieve the body and one particular header. This is done on Spring cloud Dataflow Source application, the issue is when it starts it keeps executing and returning the last message. What do I need to add so the application only execute when a new Message comes to the queue that is listening to?

Here is my Supplier Code:

@Log4j2
@EnableConfigurationProperties({ RabbitMQProperties.class })
@Configuration
public class ReceiveMessageConfiguration {
    private final static String QUEUE_NAME = "UniversalId";
    String payload;
    Connection connection;
    Channel channel;


    @Bean
    public Supplier<String> receiverMessage(RabbitMQProperties rabbitMQProperties) throws Exception {
        return () -> {
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                factory.setRequestedHeartbeat(0);

                connection = factory.newConnection();
                channel = connection.createChannel();

                channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
//              Get headers from properties
                    AMQP.BasicProperties properties = delivery.getProperties();
                    Map<String, Object> headers = properties.getHeaders();

//              Extract and print payload and header
                    for (Map.Entry<String, Object> header : headers.entrySet()) {
                        if (header.getKey().toString().equals("UniversalId")) {
//                      log.info("ID nedeed: " + header.getValue());
                        }
                    }
                    payload = message;
                };
                channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
                });
                log.info(payload);
                if (payload != null) {
                    return payload;
                }

            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                try {
                    channel.close();
                    connection.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }
            return new String("No message");

        };
    }
}

My properties file is like this:

spring.cloud.stream.function.bindings.receiverMessage-in-0=input
spring.cloud.stream.function.bindings.receiverMessage-out-0=output
spring.cloud.stream.bindings.input.destination=receiverMessage-input
spring.cloud.stream.bindings.output.destination=receiverMessage-output

Update: Here is an example of the console messages:

2022-12-21T16:25:19.725-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[           main][0;39m [36mm.c.n.r.RabbitMqCustomSourceApplication [0;39m [2m:[0;39m Started RabbitMqCustomSourceApplication in 7.888 seconds (process running for 9.482)
[2m2022-12-21T16:25:20.432-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:21.478-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:22.523-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:23.568-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:24.604-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:25.913-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:27.056-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:28.124-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:29.167-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m [
    {
        "MailRequest": {
            "action": "fourth",
            "mails": []
        }
    }
]
[2m2022-12-21T16:25:30.233-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration   [0;39m [2m:[0;39m [
    {
        "MailRequest": {
            "action": "fourth",
            "mails": []
        }
    }
]

I already try to change from RabbitMQ API to AMQP API but the behaviour is the same.


Solution

  • First I want to thanks @onobc for his/her guidence. And also to end this question, what I use to extract specific headers with payload I use the Transform processor default application with a SPeL expression like this: --expression="new String(headers.get('LoggerID') + payload). Where LoggerID is the name of the header taht you want to extract.