I have some Kafka Channel hierarchy that I am using in my project:
My base trait is:
trait SendChannel[A, B] extends CommunicationChannel {
def send(data:A): B
}
Now I have a common kafka send Channel like
trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] {
val channelProps: KafkaSendChannelProperties
val kafkaProducer: Producer[String, B]
override def close(): Unit = kafkaProducer.close()
}
Now there are 2 variants of CommanKafkaSendChannel, one is with callback and one is with Future:
trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] {
override def send(data: A): Future[RecordMetadata] = Future {
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get
}
}
KafkaSendChannelWithCallback
definition:
object KafkaSendChannelWithCallback {
def apply[A, B](oChannelProps: KafkaSendChannelProperties,
oKafkaProducer: Producer[String, B],
oCallback: Callback): KafkaSendChannelWithCallback[A, B] =
new KafkaSendChannelWithCallback[A,B] {
override val channelProps: KafkaSendChannelProperties = oChannelProps
override val kafkaProducer: Producer[String, B] = oKafkaProducer
override val callback: Callback = oCallback
}
}
trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] {
val callback: Callback
override def send(data: A): Unit =
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback)
}
Now based on the configuration value I select the proper type of channel on run time like below. I am creating actor with right type of channel which will send the data to kafka:
val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
error => {
logger.error("Exception while instantiating the KafkaSendChannel")
throw error
},
success => success
)
actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME)
Actor definition:
object IngestionRouterActor {
def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props =
Props(classOf[IngestionActor[V]], config, sendChannel)
}
The problem is when I use KafkaSendChannelWithCallback
my code compiles properly however when I use KafkaSendChannelWithFuture
it gives me below error on actor =
declaration:
[error]IngestionActor.scala:32: pattern type is incompatible with expected type; [error] found : KafkaSendChannelWithFuture[String,V] [error] required: SendChannel[V,Unit]
As both the channel definitions are extended from SendChannel
, this code should have compiled without any error. I am not sure why it is not compiling. Thanks
The Props
for IngestionActor
takes a SendChannel[V, Unit]
. Passing in a KafkaSendChannelWithCallback
to this argument works because it's a SendChannel[V, Unit]
.
On the other hand, KafkaSendChannelWithFuture
is a SendChannel[V, Future[RecordMetadata]]
. A SendChannel[V, Future[RecordMetadata]]
is not a SendChannel[V, Unit]
.
One option is to redefine the Props
to take a SendChannel[V, Any]
, since Any
is a supertype of both Unit
and Future
:
def props[V](config: Config, sendChannel: SendChannel[V, Any]): Props = ???
At this point, the compiler is still unhappy because SendChannel
, being a generic type, is invariant by default. In other words, neither SendChannel[V, Unit]
nor SendChannel[V, Future[RecordMetadata]]
are of type SendChannel[V, Any]
.
To change that, make SendChannel
covariant on the second type parameter (by adding a +
in front of the B
):
trait SendChannel[A, +B] extends CommunicationChannel {
def send(data: A): B
}