Search code examples
mongodbapache-kafkaapache-kafka-connectmongodb-kafka-connector

Overriding Mongo Source connector


Im going to use Kafka connect to consume messages from MongoDB and publish into Kafka topic.

By default Mongo source connector creates a topic per collection. But I will have many collections and would like to have just one topic for all the collections. A message will have collection name.

  1. Is it better approach to override mongo-source connector? If it so what are the things I should have keep in my mind
  2. Is there any settings provided already? I know specifying collection as empty while creating it would listen all the collections. But it creates a topic per collection.

Solution

  • As @onecricketeer suggested I used RegexRouter. So no need to override mongo source connector to publish documents from all the collections into the same topic.

    This is my configuration, It listens all the collection matching the pipeline and publishes into mongodbTopic

    {
    "name": "mongo-source",
    "config": {
        "tasks.max": "1",
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "connection.uri": "<connection_uri>",
        "topic.prefix": "",
        "pipeline": "[{\"$match\": { \"$or\": [{\"fullDocument._kind\":\"collection\"},{\"fullDocument._isInverse\":false}],\"ns.coll\": {\"$regex\": /^(.*_related)$|^(my_collection_test)$/}}}]",
        "poll.await.time.ms": 5,
        "poll.max.batch.size": 2000,
        "transforms": "dropPrefix",
        "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex": ".*",
        "transforms.dropPrefix.replacement": "mongodbTopic",
        "errors.tolerance": "none",
        "copy.existing": true
        }
    }
    

    For more Info https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html