Search code examples
kubernetesapache-kafkabitnami

Kafka Kraft replication factor of 3


I tried to run Kafka in Raft mode (zookeeper-less) in Kubernetes and everything worked fine with this configuration:

I am curious about how to change the provided configuration to run with a replication factor of 3 for instance?

The fruitful topic was on the github but no one provided Kafka Kraft mode with replication set up.

Statefulset

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-statefulset
  namespace: kafka
  labels:
    app: kafka-cluster
spec:
  serviceName: kafka-svc
  replicas: 1
  selector:
    matchLabels:
      app: kafka-cluster
  template:
    metadata:
      labels:
        app: kafka-cluster
    spec:
      containers:
        - name: kafka-container
          image: 'bitnami/kafka:latest'
          ports:
            - containerPort: 9092
            - containerPort: 9093
          env:
            - name: KAFKA_BROKER_ID
              value: "1"
            - name: KAFKA_CFG_NODE_ID
              value: "1"
            - name: KAFKA_ENABLE_KRAFT
              value: "yes"
            - name: KAFKA_CFG_PROCESS_ROLES
              value: "broker,controller"
            - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
              value: "CONTROLLER"
            - name: KAFKA_CFG_LISTENERS
              value: "CLIENT://:9092,CONTROLLER://:9093,EXTERNAL://0.0.0.0:9094"
            - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
              value: "CONTROLLER:PLAINTEXT,CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT"
            - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
              value: "CLIENT"
            - name: KAFKA_CFG_ADVERTISED_LISTENERS
              value: "CLIENT://kafka-statefulset-0.kafka-svc.kafka.svc.cluster.local:9092,EXTERNAL://127.0.0.1:9094"
            - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
              value: "1@127.0.0.1:9093"
            - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
              value: "false"
            - name: KAFKA_DEFAULT_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: ALLOW_PLAINTEXT_LISTENER
              value: "yes"

Headless service:

apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka-cluster
spec:
  clusterIP: None
  ports:
    - name: '9092'
      port: 9092
      protocol: TCP
      targetPort: 9092
  selector:
    app: kafka-cluster

Solution

  • Finally, I deployed Kafka in Kraft mode with a replication factor of 3 in Kubernetes. I used guidelines within this article. There is a very comprehensive description of how this setup works. I went through the image in docker hub doughgle/kafka-kraft, and there is a link to their Github repo where you can find a script:

    #!/bin/bash
    
    NODE_ID=${HOSTNAME:6}
    LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093"
    ADVERTISED_LISTENERS="PLAINTEXT://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:9092"
    
    CONTROLLER_QUORUM_VOTERS=""
    for i in $( seq 0 $REPLICAS); do
        if [[ $i != $REPLICAS ]]; then
            CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
        else
            CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
        fi
    done
    
    mkdir -p $SHARE_DIR/$NODE_ID
    
    if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
        CLUSTER_ID=$(kafka-storage.sh random-uuid)
        echo $CLUSTER_ID > $SHARE_DIR/cluster_id
    else
        CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
    fi
    
    sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
    -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \
    -e "s+^listeners=.*+listeners=$LISTENERS+" \
    -e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
    -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
    /opt/kafka/config/kraft/server.properties > server.properties.updated \
    && mv server.properties.updated /opt/kafka/config/kraft/server.properties
    
    kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties
    
    exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties
    

    This script is necessary for setting proper configuration one by one to pods/brokers.

    Then I built my own image with the latest version of Kafka, Scala and openjdk 17:

    FROM openjdk:17-bullseye
    
    ENV KAFKA_VERSION=3.3.1
    ENV SCALA_VERSION=2.13
    ENV KAFKA_HOME=/opt/kafka
    ENV PATH=${PATH}:${KAFKA_HOME}/bin
    
    LABEL name="kafka" version=${KAFKA_VERSION}
    
    RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
     && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
     && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
     && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
     && rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
    
    COPY ./entrypoint.sh /
    RUN ["chmod", "+x", "/entrypoint.sh"]
    ENTRYPOINT ["/entrypoint.sh"]
    

    and here is the Kubernetes configuration:

    apiVersion: v1
    kind: Namespace
    metadata:
      name: kafka-kraft
    ---
    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: kafka-pv-volume
      labels:
        type: local
    spec:
      storageClassName: manual
      capacity:
        storage: 1Gi
      accessModes:
        - ReadWriteOnce
      hostPath:
        path: '/path/to/dir'
    ---
    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: kafka-pv-claim
      namespace: kafka-kraft
    spec:
      storageClassName: manual
      accessModes:
        - ReadWriteOnce
      resources:
        requests:
          storage: 500Mi
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: kafka-svc
      labels:
        app: kafka-app
      namespace: kafka-kraft
    spec:
      clusterIP: None
      ports:
        - name: '9092'
          port: 9092
          protocol: TCP
          targetPort: 9092
      selector:
        app: kafka-app
    ---
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: kafka
      labels:
        app: kafka-app
      namespace: kafka-kraft
    spec:
      serviceName: kafka-svc
      replicas: 3
      selector:
        matchLabels:
          app: kafka-app
      template:
        metadata:
          labels:
            app: kafka-app
        spec:
          volumes:
            - name: kafka-storage
              persistentVolumeClaim:
                claimName: kafka-pv-claim
          containers:
            - name: kafka-container
              image: me/kafka-kraft
              ports:
                - containerPort: 9092
                - containerPort: 9093
              env:
                - name: REPLICAS
                  value: '3'
                - name: SERVICE
                  value: kafka-svc
                - name: NAMESPACE
                  value: kafka-kraft
                - name: SHARE_DIR
                  value: /mnt/kafka
                - name: CLUSTER_ID
                  value: oh-sxaDRTcyAr6pFRbXyzA
                - name: DEFAULT_REPLICATION_FACTOR
                  value: '3'
                - name: DEFAULT_MIN_INSYNC_REPLICAS
                  value: '2'
              volumeMounts:
                - name: kafka-storage
                  mountPath: /mnt/kafka
    
    

    I am not 100% sure if this setup works like with a stable zookeeper setup, but it is currently sufficient for me for the testing phase.

    UPDATE: Kafka Kraft is production ready in release 3.3.1