I've got Kafka and Zookeeper running via Docker compose. I'm able to send/consume messages to a topic using Kafka terminal and I'm able to monitor everything via Conduktor. But unfortunately, I'm not being able to consume msgs via my Scala app using Alpakka connector. The app connects to the topic but whenever I send a msg to the topic nothing happens.
Just Kafka and Zookeeper are running via docker-compose. I'm running Scala consumer app directly in the host machine.
Docker Compose
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
Scala App
object Main extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
val kafkaConsumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withGroupId("new_id")
.withCommitRefreshInterval(1.seconds)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withBootstrapServers("localhost:9092")
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics("test1"))
.map(msg => msg.value())
.runWith(Sink.foreach(println)).onComplete {
case Failure(exception) => exception.printStackTrace()
case Success(value) => println("done")
}
}
App - Console output
16:58:33.877 INFO [akka.event.slf4j.Slf4jLogger] Slf4jLogger started
16:58:34.470 INFO [akka.kafka.internal.SingleSourceLogic] [1955f] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-591284224]
16:58:34.516 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = novo_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
16:58:34.701 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka version: 2.4.0
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka commitId: 77a89fcf8d7fa018
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1585256314699
16:58:34.715 INFO [org.apache.kafka.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Subscribed to topic(s): test1
16:58:35.308 INFO [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Cluster ID: c2XBuDIJTI-gBs9guTvG
Export KAFKA_ADVERTISED_LISTENERS
Describes how the host name that is advertised and can be reached by clients. The value is published to ZooKeeper for clients to use.
If using the
SSL
orSASL
protocol, the endpoint value must specify the protocols in the following formats:
SSL:
SSL://
orSASL_SSL://
SASL:
SASL_PLAINTEXT://
orSASL_SSL://
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092
And now your consumer can use port 29092
:
.withBootstrapServers("localhost:29092")