Search code examples
clickhouseclickhouse-kafka

clickhouse-kafka-connect is not working in self hosted setup


I am using strimzi and altnity operator to self host kafka and clickhouse cluster in a minikube cluster. Below are steps and yaml, I am following to deploy them in minikube

  • Deploy Clickhouse Operator
curl -s https://raw.githubusercontent.com/Altinity/clickhouse-operator/release-0.24.2/deploy/operator-web-installer/clickhouse-operator-install.sh | OPERATOR_NAMESPACE=clickhouse bash
  • Deploy Clickhouse Keeper and Cluster
cat <<EOF | kubectl apply -f -
apiVersion: "clickhouse-keeper.altinity.com/v1"
kind: "ClickHouseKeeperInstallation"
metadata:
  name: demo-keeper-cluster
  namespace: "clickhouse"
spec:
  defaults:
    templates:
      podTemplate: default
      volumeClaimTemplate: default
  configuration:
    clusters:
      - name: "demo-keeper"
        layout:
          replicasCount: 3
  templates:
    podTemplates:
      - name: default
        spec:
          containers:
            - name: clickhouse-keeper
              imagePullPolicy: IfNotPresent
              image: "clickhouse/clickhouse-keeper:24.8.11.5"
              resources:
                requests:
                  memory: "256M"
                  cpu: "1"
                limits:
                  memory: "4Gi"
                  cpu: "2"
          securityContext:
            fsGroup: 101

    volumeClaimTemplates:
      - name: default
        spec:
          accessModes:
            - ReadWriteOnce
          resources:
            requests:
              storage: 1Gi
---

apiVersion: "clickhouse.altinity.com/v1"
kind: "ClickHouseInstallation"
metadata:
  name: "demo-cluster"
  namespace: "clickhouse"
spec:
  configuration:
    users:
      # hex value for 'password'
      admin/password_sha256_hex: 5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8
      admin/networks/ip:
        - "127.0.0.1"
      admin/access_management: 1
      sink_writer/profile: default
      # hex value for 'password'
      sink_writer/password_sha256_hex: 5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8
      sink_writer/networks/ip:
        - "::/0"
    zookeeper:
      nodes:
        - host: chk-demo-keeper-cluster-demo-keeper-0-0
          port: 2181
        - host: chk-demo-keeper-cluster-demo-keeper-0-1
          port: 2181
        - host: chk-demo-keeper-cluster-demo-keeper-0-2
          port: 2181
    clusters:
      - name: "demo-cluster"
        layout:
          shardsCount: 2
          replicasCount: 1
        templates:
          podTemplate: default
          volumeClaimTemplate: default
          clusterServiceTemplate: chi-cluster-service-template
  templates:
    serviceTemplates:
      - name: chi-cluster-service-template
        generateName: "cluster-{cluster}"
        metadata:
          labels:
            custom.label: "custom.value"
          annotations:
            custom.annotation: "custom.value"
        spec:
          ports:
            - name: http
              port: 8123
            - name: client
              port: 9000
          type: ClusterIP
    podTemplates:
      - name: default
        spec:
          containers:
            - name: clickhouse
              imagePullPolicy: IfNotPresent
              image: "clickhouse/clickhouse-server:24.8.11.5"
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                  - key: "clickhouse.altinity.com/cluster"
                    operator: In
                    values:
                    - demo-cluster
                topologyKey: "kubernetes.io/hostname"
    volumeClaimTemplates:
      - name: default
        spec:
          accessModes:
            - ReadWriteOnce
          resources:
            requests:
              storage: 1Gi
EOF
  • Create Database and Table inside Clickhouse
create database demodb ON CLUSTER "demo-cluster";

CREATE TABLE demodb.activity ON CLUSTER "demo-cluster"
(
    activity_id Int64,
    customer_id Int64,
    activity_type VARCHAR,
    propensity_to_churn Decimal64(11),
    ip_address VARCHAR,
) ENGINE = MergeTree()
ORDER BY customer_id;
  • Deploy kafka operator
kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
  • Create Kafka Cluster
cat <<EOF | kubectl apply -f - -n kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: demo-controller
  labels:
    strimzi.io/cluster: demo-cluster
spec:
  replicas: 3
  roles:
    - controller
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 1Gi
        kraftMetadata: shared
        deleteClaim: false
---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: demo-broker
  labels:
    strimzi.io/cluster: demo-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 1Gi
        kraftMetadata: shared
        deleteClaim: false
---

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: demo-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 3.9.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: authscram
        port: 9094
        type: internal
        tls: true
        authentication:
          type: scram-sha-512
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.9"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 1Gi
        deleteClaim: false
    template:
      pod:
        securityContext:
          # TODO: See if this can be removed in production k8s cluster
          runAsUser: 0
          fsGroup: 0
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF
  • Create kafka Topic
cat <<EOF | kubectl apply -f - -n kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: activity
  labels:
    strimzi.io/cluster: demo-cluster
spec:
  partitions: 1
  replicas: 1
  config:
    retention.ms: 604800000
    segment.bytes: 1073741824
EOF
  • Build Clickhouse Sink using dockerfile
FROM redhat/ubi8-minimal:8.10 AS custom-plugin-collector
RUN microdnf update && microdnf install wget unzip
RUN mkdir /connect-plugins
RUN wget https://github.com/ClickHouse/clickhouse-kafka-connect/releases/download/v1.2.6/clickhouse-kafka-connect-v1.2.6.zip -O clickhouse-kafka-connect.zip
RUN unzip -o clickhouse-kafka-connect.zip -d /connect-plugins

FROM quay.io/strimzi/kafka:0.45.0-kafka-3.9.0
USER root:root
COPY --from=custom-plugin-collector /connect-plugins/ /opt/kafka/plugins/
USER 1001
  • Deploy Clickhouse Sink using KafkaConnect and KafkaConnector
cat <<EOF | kubectl apply -f - -n kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: clickhouse-sink-connect-worker
  annotations:
   strimzi.io/use-connector-resources: "true"
spec:
  version: 3.9.0
  image: nitishkumar71/kafka-connect-export:0.0.3
  replicas: 1
  bootstrapServers: demo-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: demo-cluster-cluster-ca-cert
        pattern: "*.crt"
  config:
    group.id: clickhouse-sink-connect-worker
    offset.storage.topic: clickhouse-sink-connect-worker-offsets
    config.storage.topic: clickhouse-sink-connect-worker-configs
    status.storage.topic: clickhouse-sink-connect-worker-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  logging:
    type: inline
    loggers:
      log4j.logger.com.clickhouse.kafka.connect: TRACE
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: clickhouse-sink-connector
  labels:
    strimzi.io/cluster: clickhouse-sink-connect-worker
spec:
  class: com.clickhouse.kafka.connect.ClickHouseSinkConnector
  tasksMax: 1
  autoRestart:
    enabled: true
  config:
    ssl: false
    topics: activity
    hostname: cluster-demo-cluster.clickhouse.svc.cluster.local
    database: demodb
    password: password
    username: sink_writer
    port: 8123
    value.converter.schemas.enable: false
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter: org.apache.kafka.connect.storage.StringConverter
    errors.retry.timeout: 60
    schemas.enable: false
EOF

The sink is based on clickhouse-kafka-connect sink connector. It is not able to connect clickhouse properly and throwing following error

2025-01-14 10:54:57,489 TRACE [clickhouse-sink-connector|task-0] Starting ClickHouseWriter (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter) [task-thread-clickhouse-sink-connector-0]                   
2025-01-14 10:54:57,489 INFO [clickhouse-sink-connector|task-0] ClickHouse URL: http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/demodb? (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]                                                                                                                                                          
2025-01-14 10:54:57,490 INFO [clickhouse-sink-connector|task-0] Using server timezone: UTC (com.clickhouse.client.api.Client) [task-thread-clickhouse-sink-connector-0]                                        
2025-01-14 10:54:57,486 INFO [Worker clientId=connect-clickhouse-sink-connect-worker-connect-0.clickhouse-sink-connect-worker-connect.kafka.svc:8083, groupId=clickhouse-sink-connect-worker] Completed plan to restart 1 of 1 tasks for restart request for {connectorName='clickhouse-sink-connector', onlyFailed=true, includeTasks=true} (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-clickhouse-sink-connect-worker-connect-0.clickhouse-sink-connect-worker-connect.kafka.svc:8083-1]                                                                                                   
2025-01-14 10:54:57,493 INFO [clickhouse-sink-connector|task-0] Connection reuse strategy: FIFO (com.clickhouse.client.api.Client) [task-thread-clickhouse-sink-connector-0]                                   
2025-01-14 10:54:57,496 INFO [clickhouse-sink-connector|task-0] client compression: false, server compression: true, http compression: false (com.clickhouse.client.api.Client) [task-thread-clickhouse-sink-connector-0]                                                                                                                                                                                                     
2025-01-14 10:54:57,496 INFO [clickhouse-sink-connector|task-0] Using new http client implementation (com.clickhouse.client.api.Client) [task-thread-clickhouse-sink-connector-0]                              
2025-01-14 10:54:57,497 INFO [clickhouse-sink-connector|task-0] ClickHouse URL: http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/demodb? (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]                                                                                                                                                          
2025-01-14 10:54:57,497 DEBUG [clickhouse-sink-connector|task-0] Adding username [sink_writer] (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]  
2025-01-14 10:54:57,500 DEBUG [clickhouse-sink-connector|task-0] Server [ClickHouseNode [uri=http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/demodb]@1972266818] , Timeout [30000] (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]                                                                                                               
2025-01-14 10:54:57,515 WARN [clickhouse-sink-connector|task-0] Ping retry 1 out of 3 (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]           
2025-01-14 10:54:57,523 WARN [clickhouse-sink-connector|task-0] Ping retry 2 out of 3 (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]           
2025-01-14 10:54:57,535 WARN [clickhouse-sink-connector|task-0] Ping retry 3 out of 3 (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]           
2025-01-14 10:54:57,535 ERROR [clickhouse-sink-connector|task-0] Unable to ping ClickHouse instance. (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]                                                                                                                                                                                                           
2025-01-14 10:54:57,536 ERROR [clickhouse-sink-connector|task-0] Unable to ping Clickhouse server. (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter) [task-thread-clickhouse-sink-connector-0]           
2025-01-14 10:54:57,536 ERROR [clickhouse-sink-connector|task-0] WorkerSinkTask{id=clickhouse-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-clickhouse-sink-connector-0]
java.lang.RuntimeException: Connection to ClickHouse is not active.                                                                                                                                             
    at com.clickhouse.kafka.connect.sink.ProxySinkTask.<init>(ProxySinkTask.java:63)                                                                                                                            
    at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.start(ClickHouseSinkTask.java:40)                                                                                                                   
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:324)                                                                                                              
    at org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:176)                                                                                                                                 
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)                                                                                                                                   
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)                                                                                                                                     
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)                                                                                                            
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)                                                                                                                        
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                                       
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)                                                                                                                
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)                                                                                                                
    at java.base/java.lang.Thread.run(Thread.java:840)

When I tried to use URL being shown in logs with HTTP interface from the sink pod then i receive 404 status with error There is no handle /demodb

echo 'SELECT 1' | curl -H 'X-ClickHouse-User: sink_writer' -H 'X-ClickHouse-Key: password' 'http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/demodb' -d @-

But When I added database query parameter in command and run from sink pod then it works and returns Ok

echo 'SELECT 1' | curl -H 'X-ClickHouse-User: sink_writer' -H 'X-ClickHouse-Key: password' 'http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/?database=demodb' -d @-

Solution

  • Nothing wrong with config here. Recreating minikube cluster solved the issue.