I have an akka stream where I have an ADT of the form.
sealed trait Message
sealed trait ThisMessage extends Message
sealed trait ThatMessage extends Message
Now I have a This Message Handler Flow and a That Message Handler Flow. I have an inlet flow that accepts type Message.
In order to create a split, I have the following partitioner. I have the following definition for a partitioner function.
/**
* Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
*
* @tparam A type of input
* @tparam B type of output on the first outlet.
* @tparam C type of output on the second outlet.
*
* @return A partition stage
*/
def binaryPartitionByType[A, B <: A, C <: A](): Graph[FanOutShape2[A, B, C], NotUsed] =
GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
import GraphDSL.Implicits._
// This is wrong, but I have no idea how to write this.
val partitioner: UniformFanOutShape[A, A] = builder.add(Partition[A](2, {
case _: B => 0
case _: C => 1
}))
new FanOutShape2(partitioner.in, partitioner.out(0).outlet, partitioner.out(1).outlet)
}
I wish to use the above method, and use the ADT in the type params to initialize a partitioner.
The compiler throws this error.
Error:(63, 7) type mismatch;
found : akka.stream.FanOutShape2[A,A,A]
required: akka.stream.FanOutShape2[A,B,C]
new FanOutShape2(partitioner.in, partitioner.out(0).outlet,
partitioner.out(1).outlet)
From what I understand the partition object only has the Inlet (in this case A, a parametrized type.
Anyone have any ideas how I can fix this?
Here's one way to instantiate a FanOutShape2[A, B<:A, C<:A]
from UniformFanOutShape[A, A]
generated by builder.add(Partition[A]())
:
import akka.stream.scaladsl._
import akka.stream.{Graph, FanOutShape2}
import akka.NotUsed
import scala.reflect.ClassTag
def binaryPartitionByType[A, B <: A : ClassTag, C <: A : ClassTag](): Graph[FanOutShape2[A, B, C], NotUsed] =
GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[A](2, {
case _: B => 0
case _: C => 1
}))
val partitionB = builder.add(Flow[A].collect{ case b: B => b })
val partitionC = builder.add(Flow[A].collect{ case c: C => c })
partitioner.out(0) ~> partitionB
partitioner.out(1) ~> partitionC
new FanOutShape2(partitioner.in, partitionB.out, partitionC.out)
}
// binaryPartitionByType: [A, B <: A, C <: A]()(
// implicit evidence$1: scala.reflect.ClassTag[B], implicit evidence$2: scala.reflect.ClassTag[C]
// ) akka.stream.Graph[akka.stream.FanOutShape2[A,B,C],akka.NotUsed]
Note that ClassTag is needed to avoid type erasure.