Search code examples
rabbitmqakkaakka-streamalpakka

Getting failure callback for Producer in rabbitmq when back pressure kicks in


I wanted to find out the failed messages for my rabbitmq producers using some call back api.I have configured rabbitmq with [{rabbit, [{vm_memory_high_watermark, 0.001}]}]. and tried pushing lot of messages but all the messages are getting accepted and TimeoutException is coming later on and messages not getting send to Queueenter code here, Please tell me how to capture it.

Code for sending message:

// #create-sink - producer
        final Sink<ByteString, CompletionStage<Done>> amqpSink =
            AmqpSink.createSimple(
                AmqpSinkSettings.create(connectionProvider)
                    .withRoutingKey(AkkaConstants.queueName)
                    .withDeclaration(queueDeclaration));


        // #run-sink
        //final List<String> input = Arrays.asList("one", "two", "three", "four", "five");
        //Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer);

        String filePath = "D:\\subrata\\code\\akkaAmqpTest-master\\akkaAmqpTest-master\\logs2\\dummy.txt";
        Path path = Paths.get(filePath);

        // List containing 78198 individual message
        List<String> contents = Files.readAllLines(path);
        System.out.println("********** file reading done ....");
        int times = 5;

        // Send 78198*times message to Queue [From console i can see 400000 number of messages being sent]
        for(int i=0;i<times;i++) {
            Source.from(contents).map(ByteString::fromString).runWith(amqpSink, materializer);
        }
        System.out.println("************* sending to queue is done");

Solution

  • Unfortunately currently that is not supported out of the box. Ideally the producer would be modeled as a Flow which would send all incoming messages to the AMQP broker and would emit the same message with a result weather it has been successfully sent to the broker or not. There is a ticket to track this possible improvement on the Alpakka issue tracker.