I'm trying to carry the volume of the zookeeper cluster, which is a one kafka broker, to our new kafka cluster with KRaft. To do this I tried to setup a kafka connect and use mirror maker. I managed to start connect and create a connector between clusters and I also see that topics have been created in the new cluster but their old data is not there.
I found the kafka-connect compose from here . I don't get why this connect requires a bootstrap server anyway I connected it to my new cluster. This is my connect config;
version: '3'
x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.8.0
x-connect: &connect-vars
CONNECT_BOOTSTRAP_SERVERS: "192.168.18.2:29093, 192.168.18.2:29094, 192.168.18.2:29095"
CONNECT_GROUP_ID: cg_connect-jib
CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config
CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status
# Cannot be higher than the number of brokers in the Kafka cluster
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
# Defaults for all connectors
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
# Where Jib places classes
CONNECT_PLUGIN_PATH: /app/libs
# Security Mechanism for SASL_PLAINTEXT
CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SASL_JAAS_CONFIG: >
org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret";
# Additional debug options (optional for troubleshooting)
CONNECT_OPTS: "-Djava.security.debug=gssloginconfig,configfile,configparser,logincontext"
# Connect client overrides
CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000
# Connect consumer overrides
CONNECT_CONSUMER_MAX_POLL_RECORDS: 500
services:
# Jib app
connect-jib-1:
image: *connect-image
hostname: connect-jib-1
ports:
- '7083:8083'
environment:
<<: *connect-vars
CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1
volumes:
kafka_data:
driver: local
I won't share my KRaft cluster because I'm sure its config is not the problem here. This is the connector config I'm using;
{
"name": "zk",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "zk",
"target.cluster.alias": "kraft",
"source.cluster.bootstrap.servers": "192.168.2.18:29092",
"target.cluster.bootstrap.servers":"192.168.18.2:29093,192.168.18.2:29094,192.168.18.2:29095",
"target.cluster.security.protocol":"SASL_PLAINTEXT",
"target.cluster.sasl.mechanism":"PLAIN",
"cluster1->cluster2.enabled":true,
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';",
"key.converter.class":"org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter.class":"org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"topics": ".*",
"refresh.topics.enabled": true,
"refresh.groups.enabled": true,
"emit.checkpoints.enabled": true,
"sync.group.offsets.enabled": true
}
}
Before creating the connector I can see that container named mm2_connect
is able to connect to my cluster successfully. After I create the connector, the topic names are replicating and then these messages are being spammed in mm2_connect
;
[2024-12-18 12:27:45,917] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 919 due to node -3 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:45,917] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29095 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
[2024-12-18 12:27:47,113] INFO [Producer clientId=connector-producer-zk-0] Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:1017)
[2024-12-18 12:27:47,113] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 921 due to node -2 being disconnected (elapsed time since creation: 193ms, elapsed time since send: 193ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:47,113] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29094 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
[2024-12-18 12:27:48,415] INFO [Producer clientId=connector-producer-zk-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1017)
[2024-12-18 12:27:48,415] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 923 due to node -1 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:48,415] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
Also in broker logs I'm seeing these logs over and over again;
[2024-12-18 12:29:00,977] INFO [SocketServer listenerType=BROKER, nodeId=11] Failed authentication with /192.168.32.1 (channelId=192.168.32.2:29093-192.168.32.1:35880-133) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2024-12-18 12:29:04,883] INFO [SocketServer listenerType=BROKER, nodeId=11] Failed authentication with /192.168.32.1 (channelId=192.168.32.2:29093-192.168.32.1:40748-134) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
Did I misunderstood the purpose of MirrorMaker2? All I want is to migrate the existing data to a new cluster. Also I'm aware that this documentation exists I'm not sure how to apply this to docker maybe its simple but at this moment I can't comprehend it. Plus this solution requires me to restart the cluster a few times and I'm trying to avoid that. This is why I'm trying to make MM2 work. Any help is much appreciated.
As per the chosen answer here I needed to configure security on the producer for the connector that Kafka Connect is running. So I added these lines to kafka-connect settings;
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret";
Right now the data is being transfered from the old Kafka setup to new one.