Search code examples
akkaakka-streamakka-httpakka-clusteralpakka

Alpakka AMQP : How to detect declaration exception?


I have a AMQP Source and AMQP Sink with Declarations:

List<Declaration> declarations = new ArrayList<Declaration>() {{
      add(QueueDeclaration.create(sourceExchangeName));
      add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
    }};

amqpSource = AmqpSource
    .committableSource(
        NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
            .withDeclarations(declarations),
        bufferSize);

AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
    .withExchange("DEST_XCHANGE")
    .withRoutingKey("ROUTE123")
    .withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
        BuiltinExchangeType.DIRECT.getType()));

amqpSink = AmqpSink.create(amqpWriteSettings);

And then I have a flow..

amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)

Now, after i started the app, the messages were sent to source queue was not getting consumed. I later found out that this was due to errors that occurred during declarations. (i.e., it worked fine when I removed the .withDeclarations(..) in the Source and Sink settings.

So my questions:

  1. How to detect if the AMQP Source and Sink are up and running fine?
  2. How to ignore declaration exceptions?
  3. If any exception occurs, how can I know and make the system fail?

Solution

  • To answer 1 and 3, the AmqpSink materializes a CompletionStage<Done> that you would have to keep, and handle (register some callback functions on) to observe failure and completion of the stream. In the docs sample we block on that completion stage which is not good in production code (https://doc.akka.io/docs/alpakka/current/amqp.html#with-sink), that's probably because the sample is included in one of the Alpakka tests. Prefer the usual CompletionStage callback/transformation methods instead (see for example this introduction).

    The CompletionStage will fail when an error happens, when the stream is materialized/starting up or during the processing of elements, alternatively complete once the source reaches the end and each element has gone through your flow into the sink. That means that for starting up the stream, if it does not pretty quickly fail it is running.

    For question 2 not sure if it is possible to ignore the declaration exceptions, it could be that those always fail the connection.