Search code examples
scalaakkareactive-programmingakka-stream

Scala RestartSink Future


I'm trying to re-create similar functionality of Scala's [RestartSink][1] feature.

I've come up with this code. However, since we only return a SinkShape instead of a Sink, I'm having trouble specifying that it should return a Future[Done] instead of NotUsed as it's materialized type. However, I'm confused about how to do that. I'm only able to have it return [MessageActionPair, NotUsed] instead of the desired [MessageActionPair, Future[Done]]. I'm still learning my way around this framework, so I'm sure I'm missing something small. I tried calling Source.toMat(RestartWithBackoffSink...), however that doesn't give the desired result either.

private final class RestartWithBackoffSink(
                                               sourcePool:     Seq[SqsEndpoint],
                                               minBackoff:   FiniteDuration,
                                               maxBackoff:   FiniteDuration,
                                               randomFactor: Double) extends GraphStage[SinkShape[MessageActionPair]] { self ⇒

  val in = Inlet[MessageActionPair]("RoundRobinRestartWithBackoffSink.in")

  override def shape = SinkShape(in)
  override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
    "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) {
    override protected def logSource = self.getClass

    override protected def startGraph() = {
      val sourceOut = createSubOutlet(in)
      Source.fromGraph(sourceOut.source).runWith(createSink(getEndpoint))(subFusingMaterializer)
    }

    override protected def backoff() = {
      setHandler(in, new InHandler {
        override def onPush() = ()
      })
    }

    private def createSink(endpoint: SqsEndpoint): Sink[MessageActionPair, Future[Done]] = {
      SqsAckSink(endpoint.queue.url)(endpoint.client)
    }

    def getEndpoint: SqsEndpoint = {
      if(isTimedOut) {
        index = (index + 1) % sourcePool.length
        restartCount = 0
      }
      sourcePool(index)
    }

    backoff()
  }
}

Syntax error here, since types don't match:

def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, Future[Done]] = {
    Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
  }

Solution

  • By extending extends GraphStage[SinkShape[MessageActionPair]] you are defining a stage with no materialized value. Or better you define a stage that materializes to NotUsed.

    You have to decide if your stage can materialize into anything meaningful. More on materialized values for stages here.

    If so: you have to extend GraphStageWithMaterializedValue[SinkShape[MessageActionPair], Future[Done]] and properly override the createLogicAndMaterializedValue function. More guidance can be found in the docs.

    If not: you can change your types as per below

    def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, NotUsed] = {
        Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
      }