Search code examples
apache-kafkaakkaakka-stream

Alpakka Akka Stream unable to read from kafka


I have built a very simple akka stream based on the alpakka project, but it doesn't read anything from kafka even though it connects and creates a consumer group. I have created an implicit Actor System and Materializer for the stream.

val done = Consumer.committableSource(consumerSettings,
Subscriptions.topics(kafkaTopic))
.map(msg => msg.committableOffset)
.mapAsync(1) { offset =>
offset.commitScaladsl()
}
.runWith(Sink.ignore)
  • [stream.actor.dispatcher] sends this message to KafkaConsumerActor "Requesting messages, requestId: 1, partitions: Set(kafka-topic-0)"
  • The KafkaConsumerActor doesn't seem to receive the message but when the supervisor asks the Actor to shutdown it does receive the message and shutdown.

Any lead on why it fails to read Kafka without an Error or Exception ?


Solution

  • I couldn't figure out why my akka stream wasn't consuming messages from the kafka broker, But When I implemented the same stream as a Runnable Graph, it worked.

    Examples that I used - https://www.programcreek.com/scala/akka.stream.scaladsl.RunnableGraph