Search code examples
apache-kafkaapache-kafka-mirrormaker

Can't migrate old data from zookeeper cluster to KRaft cluster via MirrorMaker2


I'm trying to carry the volume of the zookeeper cluster, which is a one kafka broker, to our new kafka cluster with KRaft. To do this I tried to setup a kafka connect and use mirror maker. I managed to start connect and create a connector between clusters and I also see that topics have been created in the new cluster but their old data is not there.

I found the kafka-connect compose from here . I don't get why this connect requires a bootstrap server anyway I connected it to my new cluster. This is my connect config;

version: '3'

x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.8.0

x-connect: &connect-vars
  CONNECT_BOOTSTRAP_SERVERS: "192.168.18.2:29093, 192.168.18.2:29094, 192.168.18.2:29095"

  CONNECT_GROUP_ID: cg_connect-jib
  CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config
  CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets
  CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status
  # Cannot be higher than the number of brokers in the Kafka cluster
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
  # Defaults for all connectors
  CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
  CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
  # Where Jib places classes
  CONNECT_PLUGIN_PATH: /app/libs
  
  # Security Mechanism for SASL_PLAINTEXT
  CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
  CONNECT_SASL_MECHANISM: PLAIN
  CONNECT_SASL_JAAS_CONFIG: >
    org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="admin-secret";

  # Additional debug options (optional for troubleshooting)
  CONNECT_OPTS: "-Djava.security.debug=gssloginconfig,configfile,configparser,logincontext"

  # Connect client overrides
  CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000
  CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000
  # Connect consumer overrides
  CONNECT_CONSUMER_MAX_POLL_RECORDS: 500

services:

  # Jib app
  connect-jib-1:
    image: *connect-image
    hostname: connect-jib-1
    ports:
      - '7083:8083'
    environment:
      <<: *connect-vars
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1

volumes:
  kafka_data:
    driver: local

I won't share my KRaft cluster because I'm sure its config is not the problem here. This is the connector config I'm using;

{
    "name": "zk",
    "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "zk",
    "target.cluster.alias": "kraft",
    "source.cluster.bootstrap.servers": "192.168.2.18:29092",
    "target.cluster.bootstrap.servers":"192.168.18.2:29093,192.168.18.2:29094,192.168.18.2:29095",
    "target.cluster.security.protocol":"SASL_PLAINTEXT",
    "target.cluster.sasl.mechanism":"PLAIN",
    "cluster1->cluster2.enabled":true,
    "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';",
    "key.converter.class":"org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter.class":"org.apache.kafka.connect.converters.ByteArrayConverter",
    "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
    "topics": ".*",
    "refresh.topics.enabled": true,
    "refresh.groups.enabled": true,
    "emit.checkpoints.enabled": true,
    "sync.group.offsets.enabled": true
    }
}

Before creating the connector I can see that container named mm2_connect is able to connect to my cluster successfully. After I create the connector, the topic names are replicating and then these messages are being spammed in mm2_connect ;

[2024-12-18 12:27:45,917] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 919 due to node -3 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:45,917] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29095 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
[2024-12-18 12:27:47,113] INFO [Producer clientId=connector-producer-zk-0] Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:1017)
[2024-12-18 12:27:47,113] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 921 due to node -2 being disconnected (elapsed time since creation: 193ms, elapsed time since send: 193ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:47,113] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29094 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
[2024-12-18 12:27:48,415] INFO [Producer clientId=connector-producer-zk-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1017)
[2024-12-18 12:27:48,415] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 923 due to node -1 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:48,415] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)

Also in broker logs I'm seeing these logs over and over again;

[2024-12-18 12:29:00,977] INFO [SocketServer listenerType=BROKER, nodeId=11] Failed authentication with /192.168.32.1 (channelId=192.168.32.2:29093-192.168.32.1:35880-133) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2024-12-18 12:29:04,883] INFO [SocketServer listenerType=BROKER, nodeId=11] Failed authentication with /192.168.32.1 (channelId=192.168.32.2:29093-192.168.32.1:40748-134) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

Did I misunderstood the purpose of MirrorMaker2? All I want is to migrate the existing data to a new cluster. Also I'm aware that this documentation exists I'm not sure how to apply this to docker maybe its simple but at this moment I can't comprehend it. Plus this solution requires me to restart the cluster a few times and I'm trying to avoid that. This is why I'm trying to make MM2 work. Any help is much appreciated.


Solution

  • As per the chosen answer here I needed to configure security on the producer for the connector that Kafka Connect is running. So I added these lines to kafka-connect settings;

      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXT
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret";
    

    Right now the data is being transfered from the old Kafka setup to new one.