I have this class that manage reception of a stream of asynchronous messages. It has type argument, say A
, and I like to implement a function that filter those messages by type, say B<:A
But my first implementation is not working due to type erasure (see example below). Is there a good way to so?
Here is a simplified example of my problem:
package test.filterByType
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.Future
trait MessageStream[A]{ self =>
def next(): Future[A]
def filter[B<:A]: MessageStream[B] = new MessageStream[B]{
def next(): Future[B] = self.next.flatMap{
case b: B => Future.successful(b)
case _ => next()
}
}
}
case class MessageStreamImpl[A](msg: IndexedSeq[A]) extends MessageStream[A]{
private var index = 0
def next() = {
index += 1
Future.successful(msg(index-1))
}
}
object Main{
trait A
case class B(i: Int) extends A
case class C(i: Int) extends A
def main(args: Array[String]){
val msg: IndexedSeq[A] = (1 to 10).map{ i => if(i%2==0) B(i) else C(i) }
val streamOfA = MessageStreamImpl(msg)
val streamOfB = streamOfA.filter[B]
val b: B = Await.result(streamOfB.next(), 1 second)
}
}
At compilation, I get warning: abstract type pattern B is unchecked since it is eliminated by erasure
, and indeed the code does not work. If I execute Main, I get the error:
lang.ClassCastException: test.filterByType.Main$C cannot be cast to test.filterByType.Main$B
Which happens because it does not filter out the first list item C(1)
This small adjustment does the trick
import scala.reflect.ClassTag
def filter[B<:A](implicit C: ClassTag[B]): MessageStream[B] = new MessageStream[B]{
def next(): Future[B] = self.next.flatMap{
case b: B => Future.successful(b)
case _ => next()
}
}