Search code examples
scalajmsakka-streamoracle-aqalpakka

Alpakka JMS Transaction


I'm playing with Alpakka and its JMS connector to dequeue data from Oracle AQ. I could come up with the very basic implementation below by following this guide.

My question is how I can make it transactional, so that I can guarantee that my message won't be lost if an exception is thrown.

object ConsumerApp extends App {
    implicit val system: ActorSystem = ActorSystem("actor-system")
    implicit val materializer: ActorMaterializer = ActorMaterializer()

    val connectionFactory = AQjmsFactory.getConnectionFactory(getOracleDataSource())

    val out = JmsSource.textSource(
        JmsSourceSettings(connectionFactory).withQueue("My_Queue")
    )

    val sink = Sink.foreach { message: String =>
        println("in sink: " + message)
        throw new Exception("") // !!! MESSAGE IS LOST !!!
    }

    out.runWith(sink, materializer)
}

If it was PL/SQL, the solution would be like this:

DECLARE
  dequeue_options            DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties         DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle             RAW (44);
  msg                        SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
  DBMS_AQ.dequeue (
      queue_name           => 'My_Queue',
      dequeue_options      => dequeue_options,
      message_properties   => message_properties,
      payload              => msg,
      msgid                => message_handle
  );

  -- do something with the message

  COMMIT;
END;

Solution

  • The default behavior when a stream stage fails is to shut down the entire stream. You'll have to decide how you want to handle errors in the stream. One approach, for example, is to restart the stream with a backoff strategy.

    Also, since you're using the Alpakka JMS connector, set the acknowledgement mode to ClientAcknowledge (this is available since Alpakka 0.15). With this configuration, messages that aren't acknowledged can be delivered again through the JMS source. For example:

    val jmsSource: Source[Message, NotUsed] = JmsSource(
      JmsSourceSettings(connectionFactory)
        .withQueue("My_Queue")
        .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
    )
    
    val result = jmsSource
      .map {
        case textMessage: TextMessage =>
          val text = textMessage.getText
          textMessage.acknowledge()
          text
      }
      .runForeach(println)