I am using strimzi and altnity operator to self host kafka and clickhouse cluster in a minikube cluster. Below are steps and yaml, I am following to deploy them in minikube
curl -s https://raw.githubusercontent.com/Altinity/clickhouse-operator/release-0.24.2/deploy/operator-web-installer/clickhouse-operator-install.sh | OPERATOR_NAMESPACE=clickhouse bash
cat <<EOF | kubectl apply -f -
apiVersion: "clickhouse-keeper.altinity.com/v1"
kind: "ClickHouseKeeperInstallation"
metadata:
name: demo-keeper-cluster
namespace: "clickhouse"
spec:
defaults:
templates:
podTemplate: default
volumeClaimTemplate: default
configuration:
clusters:
- name: "demo-keeper"
layout:
replicasCount: 3
templates:
podTemplates:
- name: default
spec:
containers:
- name: clickhouse-keeper
imagePullPolicy: IfNotPresent
image: "clickhouse/clickhouse-keeper:24.8.11.5"
resources:
requests:
memory: "256M"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
securityContext:
fsGroup: 101
volumeClaimTemplates:
- name: default
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: "clickhouse.altinity.com/v1"
kind: "ClickHouseInstallation"
metadata:
name: "demo-cluster"
namespace: "clickhouse"
spec:
configuration:
users:
# hex value for 'password'
admin/password_sha256_hex: 5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8
admin/networks/ip:
- "127.0.0.1"
admin/access_management: 1
sink_writer/profile: default
# hex value for 'password'
sink_writer/password_sha256_hex: 5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8
sink_writer/networks/ip:
- "::/0"
zookeeper:
nodes:
- host: chk-demo-keeper-cluster-demo-keeper-0-0
port: 2181
- host: chk-demo-keeper-cluster-demo-keeper-0-1
port: 2181
- host: chk-demo-keeper-cluster-demo-keeper-0-2
port: 2181
clusters:
- name: "demo-cluster"
layout:
shardsCount: 2
replicasCount: 1
templates:
podTemplate: default
volumeClaimTemplate: default
clusterServiceTemplate: chi-cluster-service-template
templates:
serviceTemplates:
- name: chi-cluster-service-template
generateName: "cluster-{cluster}"
metadata:
labels:
custom.label: "custom.value"
annotations:
custom.annotation: "custom.value"
spec:
ports:
- name: http
port: 8123
- name: client
port: 9000
type: ClusterIP
podTemplates:
- name: default
spec:
containers:
- name: clickhouse
imagePullPolicy: IfNotPresent
image: "clickhouse/clickhouse-server:24.8.11.5"
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "clickhouse.altinity.com/cluster"
operator: In
values:
- demo-cluster
topologyKey: "kubernetes.io/hostname"
volumeClaimTemplates:
- name: default
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
EOF
create database demodb ON CLUSTER "demo-cluster";
CREATE TABLE demodb.activity ON CLUSTER "demo-cluster"
(
activity_id Int64,
customer_id Int64,
activity_type VARCHAR,
propensity_to_churn Decimal64(11),
ip_address VARCHAR,
) ENGINE = MergeTree()
ORDER BY customer_id;
kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
cat <<EOF | kubectl apply -f - -n kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: demo-controller
labels:
strimzi.io/cluster: demo-cluster
spec:
replicas: 3
roles:
- controller
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 1Gi
kraftMetadata: shared
deleteClaim: false
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: demo-broker
labels:
strimzi.io/cluster: demo-cluster
spec:
replicas: 3
roles:
- broker
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 1Gi
kraftMetadata: shared
deleteClaim: false
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: demo-cluster
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 3.9.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: authscram
port: 9094
type: internal
tls: true
authentication:
type: scram-sha-512
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.9"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 1Gi
deleteClaim: false
template:
pod:
securityContext:
# TODO: See if this can be removed in production k8s cluster
runAsUser: 0
fsGroup: 0
entityOperator:
topicOperator: {}
userOperator: {}
EOF
cat <<EOF | kubectl apply -f - -n kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: activity
labels:
strimzi.io/cluster: demo-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 604800000
segment.bytes: 1073741824
EOF
FROM redhat/ubi8-minimal:8.10 AS custom-plugin-collector
RUN microdnf update && microdnf install wget unzip
RUN mkdir /connect-plugins
RUN wget https://github.com/ClickHouse/clickhouse-kafka-connect/releases/download/v1.2.6/clickhouse-kafka-connect-v1.2.6.zip -O clickhouse-kafka-connect.zip
RUN unzip -o clickhouse-kafka-connect.zip -d /connect-plugins
FROM quay.io/strimzi/kafka:0.45.0-kafka-3.9.0
USER root:root
COPY --from=custom-plugin-collector /connect-plugins/ /opt/kafka/plugins/
USER 1001
cat <<EOF | kubectl apply -f - -n kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: clickhouse-sink-connect-worker
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.9.0
image: nitishkumar71/kafka-connect-export:0.0.3
replicas: 1
bootstrapServers: demo-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: demo-cluster-cluster-ca-cert
pattern: "*.crt"
config:
group.id: clickhouse-sink-connect-worker
offset.storage.topic: clickhouse-sink-connect-worker-offsets
config.storage.topic: clickhouse-sink-connect-worker-configs
status.storage.topic: clickhouse-sink-connect-worker-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
logging:
type: inline
loggers:
log4j.logger.com.clickhouse.kafka.connect: TRACE
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: clickhouse-sink-connector
labels:
strimzi.io/cluster: clickhouse-sink-connect-worker
spec:
class: com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasksMax: 1
autoRestart:
enabled: true
config:
ssl: false
topics: activity
hostname: cluster-demo-cluster.clickhouse.svc.cluster.local
database: demodb
password: password
username: sink_writer
port: 8123
value.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter: org.apache.kafka.connect.storage.StringConverter
errors.retry.timeout: 60
schemas.enable: false
EOF
The sink is based on clickhouse-kafka-connect sink connector. It is not able to connect clickhouse properly and throwing following error
2025-01-14 10:54:57,489 TRACE [clickhouse-sink-connector|task-0] Starting ClickHouseWriter (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,489 INFO [clickhouse-sink-connector|task-0] ClickHouse URL: http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/demodb? (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,490 INFO [clickhouse-sink-connector|task-0] Using server timezone: UTC (com.clickhouse.client.api.Client) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,486 INFO [Worker clientId=connect-clickhouse-sink-connect-worker-connect-0.clickhouse-sink-connect-worker-connect.kafka.svc:8083, groupId=clickhouse-sink-connect-worker] Completed plan to restart 1 of 1 tasks for restart request for {connectorName='clickhouse-sink-connector', onlyFailed=true, includeTasks=true} (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-clickhouse-sink-connect-worker-connect-0.clickhouse-sink-connect-worker-connect.kafka.svc:8083-1]
2025-01-14 10:54:57,493 INFO [clickhouse-sink-connector|task-0] Connection reuse strategy: FIFO (com.clickhouse.client.api.Client) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,496 INFO [clickhouse-sink-connector|task-0] client compression: false, server compression: true, http compression: false (com.clickhouse.client.api.Client) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,496 INFO [clickhouse-sink-connector|task-0] Using new http client implementation (com.clickhouse.client.api.Client) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,497 INFO [clickhouse-sink-connector|task-0] ClickHouse URL: http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/demodb? (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,497 DEBUG [clickhouse-sink-connector|task-0] Adding username [sink_writer] (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,500 DEBUG [clickhouse-sink-connector|task-0] Server [ClickHouseNode [uri=http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/demodb]@1972266818] , Timeout [30000] (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,515 WARN [clickhouse-sink-connector|task-0] Ping retry 1 out of 3 (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,523 WARN [clickhouse-sink-connector|task-0] Ping retry 2 out of 3 (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,535 WARN [clickhouse-sink-connector|task-0] Ping retry 3 out of 3 (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,535 ERROR [clickhouse-sink-connector|task-0] Unable to ping ClickHouse instance. (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,536 ERROR [clickhouse-sink-connector|task-0] Unable to ping Clickhouse server. (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter) [task-thread-clickhouse-sink-connector-0]
2025-01-14 10:54:57,536 ERROR [clickhouse-sink-connector|task-0] WorkerSinkTask{id=clickhouse-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-clickhouse-sink-connector-0]
java.lang.RuntimeException: Connection to ClickHouse is not active.
at com.clickhouse.kafka.connect.sink.ProxySinkTask.<init>(ProxySinkTask.java:63)
at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.start(ClickHouseSinkTask.java:40)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:324)
at org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:176)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
When I tried to use URL being shown in logs with HTTP interface from the sink pod then i receive 404 status with error There is no handle /demodb
echo 'SELECT 1' | curl -H 'X-ClickHouse-User: sink_writer' -H 'X-ClickHouse-Key: password' 'http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/demodb' -d @-
But When I added database
query parameter in command and run from sink pod then it works and returns Ok
echo 'SELECT 1' | curl -H 'X-ClickHouse-User: sink_writer' -H 'X-ClickHouse-Key: password' 'http://cluster-demo-cluster.clickhouse.svc.cluster.local:8123/?database=demodb' -d @-
Nothing wrong with config here. Recreating minikube cluster solved the issue.