Search code examples
apache-kafka-connectmongodb-kafka-connector

Single or multiple source kaka connector(s) for reading multiple collections in the mongoDB cluster


I want to know if it is recommended to create multiple Kafka connectors for streaming multiple collections data available in the same database or different databases within the same MongoDB cluster.

I think there will be only one oplog per cluster. So it is easy to read the data for multiple collections and this approach will put less load on the cluster. But I am not sure how easy it will be to put the data on different Kafka topics per collection. While in the second approach of creating multiple connectors. I feel like it is going to put too much load on the server.

Please suggest what is the recommended approach.


Solution

  • You can listen to multiple change streams from multiple mongo collections, you just need to provide the suitable Regex for the collection names in pipeline. You can even exclude collection/collections by providing the Regex from where you don't want to listen to any change streams.

    "pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/}},{\"ns.coll\":{\"$regex\":/^collection_.*/}}]}}]"  
    

    You can even exclude any given database using $nin, which you don't want to listen for any change-stream.

    "pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/,\"$nin\":[/^any_database_name$/]}},{\"ns.coll\":{\"$regex\":/^collection_.*/}}]}}]"
    

    Coming to your questions:

    • From my point of view it will be an overhead to create an N number of Kafka connectors for each collection, rather I would recommend creating a single Kafka connector. Make sure you provide fault tolerance using recommended configurations, just don't rely on a default configuration of connector.

    Here is the basic Kafka connector configuration.

    Mongo to Kafka source connector

    {
      "name": "mongo-to-kafka-connect",
      "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "publish.full.document.only": "true",
        "tasks.max": "3",
        "key.converter.schemas.enable": "false",
        "topic.creation.enable": "true",
        "poll.await.time.ms": 1000,
        "poll.max.batch.size": 100,
        "topic.prefix": "any prefix for topic name",
        "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
        "connection.uri": "mongodb://<username>:<password>@ip:27017,ip:27017,ip:27017,ip:27017/?authSource=admin&replicaSet=xyz&tls=true",
        "value.converter.schemas.enable": "false",
        "copy.existing": "true",
        "topic.creation.default.replication.factor": 3,
        "topic.creation.default.partitions": 3,
        "topic.creation.compacted.cleanup.policy": "compact",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "mongo.errors.log.enable": "true",
        "heartbeat.interval.ms": 10000,
        "pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/}},{\"ns.coll\":{\"$regex\":/^collection_.*/}}]}}]"
      }
    }
    

    You can get more details from official docs.