I have a AMQP Source and AMQP Sink with Declarations:
List<Declaration> declarations = new ArrayList<Declaration>() {{
add(QueueDeclaration.create(sourceExchangeName));
add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
}};
amqpSource = AmqpSource
.committableSource(
NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
.withDeclarations(declarations),
bufferSize);
AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
.withExchange("DEST_XCHANGE")
.withRoutingKey("ROUTE123")
.withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
BuiltinExchangeType.DIRECT.getType()));
amqpSink = AmqpSink.create(amqpWriteSettings);
And then I have a flow..
amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)
Now, after i started the app, the messages were sent to source queue was not getting consumed. I later found out that this was due to errors that occurred during declarations. (i.e., it worked fine when I removed the .withDeclarations(..) in the Source and Sink settings.
So my questions:
To answer 1 and 3, the AmqpSink
materializes a CompletionStage<Done>
that you would have to keep, and handle (register some callback functions on) to observe failure and completion of the stream. In the docs sample we block on that completion stage which is not good in production code (https://doc.akka.io/docs/alpakka/current/amqp.html#with-sink), that's probably because the sample is included in one of the Alpakka tests. Prefer the usual CompletionStage
callback/transformation methods instead (see for example this introduction).
The CompletionStage
will fail when an error happens, when the stream is materialized/starting up or during the processing of elements, alternatively complete once the source reaches the end and each element has gone through your flow into the sink. That means that for starting up the stream, if it does not pretty quickly fail it is running.
For question 2 not sure if it is possible to ignore the declaration exceptions, it could be that those always fail the connection.