Search code examples
scalaakkaakka-stream

What is the best practice for throwing exceptions inside an Akka Stream - Scala?


I am new to Akka Stream and trying to figure out what is the best practice for handling unexpected behaviors inside a stream.

I am building a stream which uses Account Kit API to exchange short-term code to a long-live access token.

Here is the relevant code for building the stream:

override def ExchangeCode: Flow[String, AccessTokenInfo, NotUsed] =
Flow[String].mapAsync(1) { code =>
  ws.url(buildUrl("access_token"))
    .withQueryString(
      "grant_type" -> "authorization_code",
      "code" -> code,
      "access_token" -> appToken
    )
    .withRequestTimeout(timeout)
    .get
    .map { response =>
      if (response.status != 200) throw new RuntimeException("Unexpected response")
      else response.json
    }
    .map { json =>
      AccessTokenInfo("123456", 123, "123456")
    }
}

I wonder if throwing an exception if status code is not 200 is the right way to deal with it, but this is the only way of terminating a stream ahead of time as I know it. (Currently the return value is a dummy one)


Solution

  • Why is it a strict necessity that the stream terminate if the status is not 200? Usually you want to send even the failures downstream so any user of the Flow can be informed and act accordingly.

    The typical way of dealing with this type of failure is with Try. Slightly modifying your Flow:

    //                                  Add Try Output
    //                                       |
    //                                       v
    override def ExchangeCode: Flow[String, Try[AccessTokenInfo], _] = 
      Flow[String] 
        ...
        .map { response => response.status match {
            case 200 => Try { response.json }
            case _   => Try { throw new RuntimeException("Unexpected Response") }
          }
        }
        .map( _ map (json => AccessTokenInfo("123456", 123, "123456")))
    

    Now any user of the flow can get the valid access token if it was possible to retrieve or get the Exception and be able to handle the failure case.

    Alternative Return Type: Option

    If there is only 1 Exception that can be generated by only 1 cause then an Option is a viable return type as well:

    override def ExchangeCode: Flow[String, Option[AccessTokenInfo], _] = 
      ...
            case 200 => Some(response.json)
            case _   => None
      ...