I'm trying to re-create similar functionality of Scala's [RestartSink][1]
feature.
I've come up with this code. However, since we only return a SinkShape
instead of a Sink
, I'm having trouble specifying that it should return a Future[Done]
instead of NotUsed
as it's materialized type. However, I'm confused about how to do that. I'm only able to have it return [MessageActionPair, NotUsed]
instead of the desired [MessageActionPair, Future[Done]]
. I'm still learning my way around this framework, so I'm sure I'm missing something small. I tried calling Source.toMat(RestartWithBackoffSink...)
, however that doesn't give the desired result either.
private final class RestartWithBackoffSink(
sourcePool: Seq[SqsEndpoint],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double) extends GraphStage[SinkShape[MessageActionPair]] { self ⇒
val in = Inlet[MessageActionPair]("RoundRobinRestartWithBackoffSink.in")
override def shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) {
override protected def logSource = self.getClass
override protected def startGraph() = {
val sourceOut = createSubOutlet(in)
Source.fromGraph(sourceOut.source).runWith(createSink(getEndpoint))(subFusingMaterializer)
}
override protected def backoff() = {
setHandler(in, new InHandler {
override def onPush() = ()
})
}
private def createSink(endpoint: SqsEndpoint): Sink[MessageActionPair, Future[Done]] = {
SqsAckSink(endpoint.queue.url)(endpoint.client)
}
def getEndpoint: SqsEndpoint = {
if(isTimedOut) {
index = (index + 1) % sourcePool.length
restartCount = 0
}
sourcePool(index)
}
backoff()
}
}
Syntax error here, since types don't match:
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, Future[Done]] = {
Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
}
By extending extends GraphStage[SinkShape[MessageActionPair]]
you are defining a stage with no materialized value. Or better you define a stage that materializes to NotUsed
.
You have to decide if your stage can materialize into anything meaningful. More on materialized values for stages here.
If so: you have to extend GraphStageWithMaterializedValue[SinkShape[MessageActionPair], Future[Done]]
and properly override the createLogicAndMaterializedValue
function. More guidance can be found in the docs.
If not: you can change your types as per below
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, NotUsed] = {
Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
}