Search code examples
scalarabbitmqakkaakka-streamalpakka

How to release a message back to RabbitMQ so it's available for another consumer?


I'm using the Alpakka AMQP library (https://developer.lightbend.com/docs/alpakka/current/amqp.html) to process RabbitMQ messages in a reactive stream and dump them into Kafka.

We are using Avro and Schema Registry, so malformed messages fail validation and need to be handled on a case by case basis. The behavior we want is that the app dies, and is restarted 30 seconds - 1 minute later, and in the interim we are able to pull the message off the queue using the UI to see what's wrong with it.

For some reason, my nack() doesn't seem to be releasing the message back, and it stays in the Unacked state and is unable to be viewed until released. How would one do this? I'll include a snippet of my code:

Flow[CommittableIncomingMessage]
      .map { msg =>
        rabbitReceivedCounter.increment()
        reconciliationGauge.increment()

        val json = msg.message.bytes.utf8String

        val record = Try(converter.convert(json)) match {
          case Success(result) => result
          case Failure(e) => e match {
            case e: JsonToAvroConversionException ⇒
              val errorMsg = s"Failed to serialize to avro: ${e.getMessage} - Field: ${e.getFieldName} - Value: ${e.getValue}"
              failurePublisher.publishError(errorMsg, StatusCodes.BadRequest.intValue)
              kafkaFailureCounter.increment()
              Await.result(msg.nack(), Duration.Undefined)
              throw e

The nack() is a java.util.Future, so just for testing I threw an Await around it to make sure the problem isn't that I'm throwing the error before the signal can make it to Rabbit.


Solution

  • When an exception is thrown in a stream, the element that caused the error is typically lost. One idea is to offload the Avro conversion to an actor that returns both the result of the conversion and the original message: this approach would allow you to ack or nack a message depending on whether it was convertible to Avro.

    For example, the actor could look something like the following:

    case class AvroResult(avro: Option[Avro], msg: CommittableIncomingMessage)
                                    // ^ change this to whatever type you're using
    
    class ConverterActor extends Actor {
      val converter = ???
    
      def receive = {
        case msg: CommittableIncomingMessage =>
          try {
            val json = msg.message.bytes.utf8String
            val avro = converter.convert(json)
            sender() ! AvroResult(Some(avro), msg)
          } catch {
            case _ =>
              sender() ! AvroResult(None, msg)
          }
      }
    }
    

    Then you could interrogate this actor in your stream:

    val converterActor = system.actorOf(Props[ConverterActor])
    
    val source: Source[Avro, _] = amqpSource
      .ask[AvroResult](converterActor)
      .mapAsync(1) {
        case AvroResult(Some(avro), msg) => // ack
          msg.ack().map(_ => Some(avro))
        case AvroResult(None, msg) => // nack
          msg.nack().map(_ => None)
      }
      .collect {
        case Some(avro) => avro
      }
    

    The above Source emits downstream the Avro-converted messages that are acknowledged with ack. The messages that are not convertible are rejected with nack and are not passed downstream.