I have a project where we are consuming data from kafka and publishing to mongo. In fact the code base does only one task, may be mongo to kafka migration, kafka to mongo migration or something else.
we have to consume from different kafka topics and publish to different mongo collections. Now these are parallel streams of work.
Current design is to have one codebase which can consume from Any topic and publish to Any mongo collection which is configurable using Environment variables. So we created One kubernetes Pod and have multiple containers inside it. each container has different environment variables.
My questions:
Sample of step 1:
apiVersion: apps/v1
kind: Deployment
metadata:
annotations: {}
name: test-raw-mongodb-sink-apps
namespace: test-apps
spec:
selector:
matchLabels:
app: test-raw-mongodb-sink-apps
template:
metadata:
labels:
app: test-raw-mongodb-sink-apps
spec:
containers:
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-alchemy
- name: INPUT_TOPIC
value: test.raw.ptv.alchemy
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8081"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/dpl/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-alchemy
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-bloomberg
- name: INPUT_TOPIC
value: test.raw.pretrade.bloomberg
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8082"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-bloomberg
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-calypso
- name: INPUT_TOPIC
value: test.raw.ptv.calypso
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8083"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-calypso
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-dtres
- name: INPUT_TOPIC
value: test.raw.ptv.dtres
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8084"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-dtres
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-feds
- name: INPUT_TOPIC
value: test.raw.ptv.feds
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8085"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-feds
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-hoops
- name: INPUT_TOPIC
value: test.raw.ptv.hoops
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8086"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-hoops
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-mxcore
- name: INPUT_TOPIC
value: test.raw.ptv.murex_core
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8087"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-mxcore
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-mxeqd
- name: INPUT_TOPIC
value: test.raw.ptv.murex_eqd_sa
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8088"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-mxeqd
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-mxgts
- name: INPUT_TOPIC
value: test.raw.ptv.murex_gts_sa
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8089"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-mxgts
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-mxmr
- name: INPUT_TOPIC
value: test.raw.ptv.murex_mr
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8090"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-mxmr
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-mxgtscf
- name: INPUT_TOPIC
value: test.raw.cashflow.murex_gts_sa
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8091"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-mxgtscf
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-mxcoll
- name: INPUT_TOPIC
value: test.raw.collateral.mxcoll
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8092"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-mxcoll
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-mxcoll-link
- name: INPUT_TOPIC
value: test.raw.collateral.mxcoll_link
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8093"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-mxcoll-link
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-ost
- name: INPUT_TOPIC
value: test.raw.ptv.ost
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8094"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-ost
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
- env:
- name: EVENTS_TOPIC
value: test.ops.proc-events
- name: GROUP_ID
value: test-mongodb-sink-posmon
- name: INPUT_TOPIC
value: test.raw.ptp.posmon
- name: MONGODB_AUTH_DB
value: admin
- name: MONGODB_HOST0
value: test-mongodb-0.test-mongodb-headless.test-infra
- name: MONGODB_HOST1
value: test-mongodb-1.test-mongodb-headless.test-infra
- name: MONGODB_PASSWORD
value: test123
- name: MONGODB_PORT
value: "27017"
- name: MONGODB_USERNAME
value: root
- name: SERVER_PORT
value: "8095"
- name: KAFKA_BROKERS
value: kafka-cluster-kafka-bootstrap.kafka:9093
- name: TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: ca.password
name: kafka-ca-cert
- name: KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: user.password
name: kafka
image: tools.testCompany.co.za:8093/local/tt--mongodb-map:0.0.7.0-SNAPSHOT
name: test-mongodb-sink-posmon
securityContext:
allowPrivilegeEscalation: true
privileged: true
volumeMounts:
- mountPath: /app/resources
name: properties
- mountPath: /stores
name: stores
readOnly: true
Thanks
A templating tool like Helm will let you fill in the environment-variable values from deploy-time settings. In Helm this would look like:
env:
- name: EVENTS_TOPIC
value: {{ .Values.eventsTopic }}
- name: GROUP_ID
value: {{ .Values.groupId }}
- name: INPUT_TOPIC
value: {{ .Values.inputTopic }}
You could then deploy this multiple times with different sets of topics:
helm install alchemy . \
--set eventsTopic=test.ops.proc-events \
--set groupId=test-mongodb-sink-alchemy \
--set inputTopic=test.raw.ptv.alchemy
helm install bloomberg . \
--set eventsTopic=test.ops.proc-events \
--set groupId=test-mongodb-sink-bloomberg \
--set inputTopic=test.raw.pretrade.bloomberg
You could write the Helm chart to be configured with a list of topic sets, too, and only deploy the set once:
{{- $top := . -}}{{-/* because "range" overwrites "." */-}}
{{- $topic := range $topics -}}
---
apiVersion: v1
kind: Deployment
metadata:
name: {{ $topic.name }}
spec:
...
env:
- name: EVENT_TOPIC
value: {{ $top.Values.eventTopic }}{{/* common to all deployments */}}
- name: GROUP_ID
value: test-mongodb-sink-{{ $topic.name }}
- name: INPUT_TOPIC
value: {{ $topic.inputTopic }}
Write configuration like:
eventTopic: test.ops.proc-events
topics:
- name: alchemy
inputTopic: test.raw.ptv.alchemy
- name: bloomberg
inputTopic: test.raw.pretrade.bloomberg
And deploy like:
helm install connector . -f topic-listing.yaml
In any case, you will want only one container per pod. There are a couple of reasons for this. If the list of topics ever changes, this lets you create or delete deployments without interfering with the other topics; if everything was in a single pod, you'd have to stop and restart everything together, and it can take Kafka a minute or two to figure out what happens. In a Kafka context, you can also run as many consumers as there are partitions on a topic, but not really more; if you have a very busy topic you can easily set that deployment's replicas:
to have multiple consumers for multiple partitions, but if everything together is in one pod, your only choice is to scale everything together.