I'm trying to learn Flink, Docker, and Kafka, so I'm playing around with setting up a simple dummy setup to have Flink and Kafka communicate from different containers. I'm doing all of this locally, but I also need to understand how to potentially do this with the containers running from different locations, so I don't want to take any shortcuts just to get it to work.
I have a flink job that is currently just a stripped-down version of the fraud detection example in scala. I had this working on a virtual machine with Kafka and Flink running, so the only relevant part here is probably the bootstrap server. Here are the contents of the whole main function from the job:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val transactions: DataStream[Transaction] = env
.addSource(new TransactionSource)
.name("transactions")
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
val myProducer = new FlinkKafkaProducer[Transaction](
"flagged-transactions", // target topic
TransactionSchema("flagged-transactions"),
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
transactions.addSink(myProducer)
env.execute("transactions")
I start a docker network with docker network create flink-network
. Then I start flink with docker run -d --rm --name=jobmanager --network flink-network --publish 8081:8081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest jobmanager
, and start a task manager with docker run -d --rm --name=taskmanager --network flink-network --publish 9092:9092 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest taskmanager
At the moment, my docker-compose.yml to start kafka looks like this:
version: '2'
networks:
default:
external: true
name: flink-network
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
command: sh -c "((sleep 15 && kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic flagged-transactions)&) && /etc/confluent/docker/run "
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
I start the kafka container with docker-compose up -d
, and then I go to flink's web interface and submit the job. It runs, but then I get the exception Failed to send data to Kafka: Topic flagged-transactions not present in metadata after 60000 ms.
If I access kafka with docker exec
, I can see that my flagged-transactions
topic is there, so I think it must be a networking issue.
The original docker-compose.yml
was from here, and I've added a few things during my troubleshooting process. I'm not really clear on a lot of the networking aspects of it, so a lot of it is guesswork for me, and copy/pasting solutions to various problems from SO. I've tried a few variations of the environment
section of the yml (for example, changing localhost
to kafka
and vice-versa in many permutations), but none of them work.
Any suggestions?
A few things from first glance:
Exposed ports and advertised listeners
You are exposing port 29092 on your docker-compose which is configured to the "kafka" host name. You will likely not be able to communicate with Kafka from your host machine this way if you wanted to. You need to expose 9092:9092 (i.e. localhost) so the destination name from the client (localhost) matches the host name on the port if that makes sense. That compose file you've linked exposes 29092, but correctly uses 29092 on "localhost" and 9092 on "kafka" to make it work, you've set it the other way around.
Container-container communication via container name
Your job function is running on a self-contained flink container, trying to connect to bootstrap-server "localhost:9092", which won't work as it's trying to access itself. You need to change that to "kafka:29092" so it communicates with the Kafka container. If you are running the flink job on your host machine, localhost:9092 would work.
Other stuff
Make those changes and see if you have luck. You should be able to see the broker server from your host machine via localhost:9092 (use a Kafka client, portqry, netcat etc.) and from your flink containers via kafka:29092 (likely use netcat etc.).
If there are networking issues with the bridge, try adding your flink containers to the same docker-compose file and remove the "networks" entry from the YAML file to start the group by default on their own network. Providing they are on their own network and configured properly, containers can communicate via container name.
Kafka listeners basically need to match what the client connects to. So if you connect from your host machine to "localhost:9092", your listener for 9092 needs to be "localhost" so the expected names match, otherwise connection will fail. If we connect from another container to port 29092 via hostname "kafka", that listener needs to be configured to "kafka:29092", and so on.
If none of that works, post some logs of the Kafka server container and we'll go from there.