Search code examples
scalagenericsapache-kafkaakkaakka-stream

Akka Pattern Match on Generic Type


I have an Actor instance as defined like this:

class KafkaPublisher[T <: KafkaMessage] extends Actor {

  override final def receive = {
    case ProducerStreamActivated(_, stream: SourceQueueWithComplete[T]) =>
      context.become(active(stream))

    case other => println("KafkaPublisher got some unknown message while producing: " + other)
  }

  def active(stream: SourceQueueWithComplete[T]): Receive = super.receive orElse {
    case msg: T =>
      stream.offer(msg)

    case other => println("KafkaPublisher got the unknown message while producing: " + other)
  }
}
object KafkaPublisher {

  def props[T <: KafkaMessage](implicit tag: ClassTag[T]) = Props(new KafkaPublisher[T])
}

Where the KafkaMessage is a marker trait and every message that I want to publish to Kafka will extend this marker trait. I have a supervisor Actor that actually will create instances of this KafkaPublisher Actor depending on the type of Kafka message. For example.,

case class MeterData(id: String, meterReadings: Map[DateTime, String]) extends KafkaMessage

So in my SupervisorActor, I create an instance of my PublisherActor like this:

def actorFor(props: Props, actorName: String) = context.actorOf(props, actorName)
val actorRef = actorFor(KafkaPublisher.props[MeterData], "meter-data-actor")

Similarly, I create the corresponding Akka stream flow for the given KafkaMessage type.

So my question is, would this KafkaPublisher actor pattern match on the MeterData, given that I have a generic pattern matching defined. Is the ClassTag in my props method just enough so that I can get past the type erasure? Suggestions?


Solution

  • Pattern matching on a generic type T should work, when there an implicit ClassTag[T] in scope. So it should be enough to provide and store a ClassTag in your KafkaPublisher:

    import scala.reflect.ClassTag
    
    class KafkaPublisher[T <: KafkaMessage : ClassTag] {
      /* ... */
    }
    
    object KafkaPublisher {
      def props[T <: KafkaMessage : ClassTag] = Props(new KafkaPublisher[T])
    }
    

    If the type T is concrete at the props call site, or it's generic but there is an implicit ClassTag[T] there, then you can just call props as usual: actorFor(KafkaPublisher.props[MeterData], "meter-data-actor")