I'm using the Alpakka AMQP library (https://developer.lightbend.com/docs/alpakka/current/amqp.html) to process RabbitMQ messages in a reactive stream and dump them into Kafka.
We are using Avro and Schema Registry, so malformed messages fail validation and need to be handled on a case by case basis. The behavior we want is that the app dies, and is restarted 30 seconds - 1 minute later, and in the interim we are able to pull the message off the queue using the UI to see what's wrong with it.
For some reason, my nack()
doesn't seem to be releasing the message back, and it stays in the Unacked
state and is unable to be viewed until released. How would one do this? I'll include a snippet of my code:
Flow[CommittableIncomingMessage]
.map { msg =>
rabbitReceivedCounter.increment()
reconciliationGauge.increment()
val json = msg.message.bytes.utf8String
val record = Try(converter.convert(json)) match {
case Success(result) => result
case Failure(e) => e match {
case e: JsonToAvroConversionException ⇒
val errorMsg = s"Failed to serialize to avro: ${e.getMessage} - Field: ${e.getFieldName} - Value: ${e.getValue}"
failurePublisher.publishError(errorMsg, StatusCodes.BadRequest.intValue)
kafkaFailureCounter.increment()
Await.result(msg.nack(), Duration.Undefined)
throw e
The nack()
is a java.util.Future
, so just for testing I threw an Await
around it to make sure the problem isn't that I'm throwing the error before the signal can make it to Rabbit.
When an exception is thrown in a stream, the element that caused the error is typically lost. One idea is to offload the Avro conversion to an actor that returns both the result of the conversion and the original message: this approach would allow you to ack
or nack
a message depending on whether it was convertible to Avro.
For example, the actor could look something like the following:
case class AvroResult(avro: Option[Avro], msg: CommittableIncomingMessage)
// ^ change this to whatever type you're using
class ConverterActor extends Actor {
val converter = ???
def receive = {
case msg: CommittableIncomingMessage =>
try {
val json = msg.message.bytes.utf8String
val avro = converter.convert(json)
sender() ! AvroResult(Some(avro), msg)
} catch {
case _ =>
sender() ! AvroResult(None, msg)
}
}
}
Then you could interrogate this actor in your stream:
val converterActor = system.actorOf(Props[ConverterActor])
val source: Source[Avro, _] = amqpSource
.ask[AvroResult](converterActor)
.mapAsync(1) {
case AvroResult(Some(avro), msg) => // ack
msg.ack().map(_ => Some(avro))
case AvroResult(None, msg) => // nack
msg.nack().map(_ => None)
}
.collect {
case Some(avro) => avro
}
The above Source
emits downstream the Avro-converted messages that are acknowledged with ack
. The messages that are not convertible are rejected with nack
and are not passed downstream.