I recently implemented an akka-stream flow which parse some json messages, validate the presence of a given key (destination_region
) and pass to the next stage a case class containing the original message and the destination_region
string.
I implemented a custom decider so that in case it face any parsing or key error, it will trigger Supervision.Resume
after logging the exception.
A minimalistic implementation would look like:
package com.example.stages
import com.example.helpers.EitherHelpers._
/*
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object EitherHelpers {
implicit class ErrorEither[L <: Throwable, R](val either: Either[L, R]) extends AnyVal {
def asFuture: Future[R] = either.fold(Future.failed, Future.successful)
def asTry: Try[R] = either.fold(Failure.apply, Success.apply)
}
}
*/
import scala.concurrent.ExecutionContext
import akka.NotUsed
import akka.stream.scaladsl.Flow
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision
import software.amazon.awssdk.services.sqs.model.Message
import io.circe.parser.parse
import io.circe.{DecodingFailure, ParsingFailure}
object MessageContentValidationFlow {
def apply()(
implicit executionContext: ExecutionContext): Flow[Message, MessageWithRegion, NotUsed] = {
val customDecider: Supervision.Decider = {
case e @ (_: DecodingFailure | _: ParsingFailure) => {
println(e)
Supervision.Resume
}
case _ => Supervision.Stop
}
Flow[Message]
.mapAsync[MessageWithRegion](2) { message =>
println(s"Received message: $message")
val messageWithRegion = for {
parsed <- parse(message.body()).asFuture
region <- parsed.hcursor.downField("destination_region").as[String].asFuture
} yield { MessageWithRegion(message, region) }
messageWithRegion
}
.withAttributes(supervisionStrategy(customDecider))
}
}
case class MessageWithRegion(message: Message, region: String)
I managed to test the case where the message is valid, however I have not clue about how to test the flow in case of ParsingFailure
or DecodingFailure
. I have tried almost all methods available for sub
in the implementation below:
package com.example.stages
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import io.circe.generic.JsonCodec, io.circe.syntax._
import io.circe.generic.auto._
import software.amazon.awssdk.services.sqs.model.Message
import org.scalatest.FlatSpec
@JsonCodec case class MessageBody(destination_region: String)
class MessageContentValidationFlowSpecs extends FlatSpec {
implicit val system = ActorSystem("MessageContentValidationFlow")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val (pub, sub) = TestSource.probe[Message]
.via(MessageContentValidationFlow())
.toMat(TestSink.probe[MessageWithRegion])(Keep.both)
.run()
"MessageContentValidationFlow" should "process valid messages" in {
val validRegion = "eu-west-1"
val msgBody = MessageBody(validRegion).asJson.toString()
val validMessage = Message.builder().body(msgBody).build()
sub.request(n = 1)
pub.sendNext(validMessage)
val expectedMessageWithRegion = MessageWithRegion(
message = validMessage,
region = validRegion
)
assert(sub.requestNext() == expectedMessageWithRegion)
}
ignore should "trigger Supervision.Resume with empty messages" in {
val emptyMessage = Message.builder().body("").build()
assert(emptyMessage.body() == "")
sub.request(n = 1)
pub.sendNext(emptyMessage)
sub.expectComplete()
}
}
Does anyone know how to test that Supervision.Resume
was triggered and which exception was caught by the custom decider?
Since Supervision.Resume
drops erroneous elements and continues processing the stream, one way to test that supervision strategy is to run a stream that contains a mix of "good" and "bad" elements and confirm whether the materialized value consists of only the "good" elements. For example:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import org.scalatest._
import scala.concurrent._
import scala.concurrent.duration._
class MyTest extends FlatSpec with Matchers {
implicit val system = ActorSystem("MyTest")
implicit val materializer = ActorMaterializer()
val resumingFlow = Flow[Int].map {
case 2 => throw new RuntimeException("bad number")
case i => i
}.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
"resumingFlow" should "drop the number 2" in {
val result: collection.immutable.Seq[Int] =
Await.result(Source((1 to 5).toSeq).via(resumingFlow).runWith(Sink.seq), 5.seconds)
result should be (List(1, 3, 4, 5))
}
}
In your case, create a stream that has valid Message
objects and at least one invalid Message
object.