Search code examples
mongodbapache-kafkaapache-kafka-connectconfluent-platformmongodb-kafka-connector

Kafka message includes control characters (MongoDB Source Connector)


I'm have a Kafka Connect MongoDB Source Connector (both via Confluent Platform) working but the messages it creates contain a control character at the start, which makes downstream parsing (to JSON) of this message harder than I imagine it should be.

The Source connector that's running:

{
    "name": "mongo-source-connector",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "connection.uri": "mongodb://myUsername:myPassword@my-mongodb-host-address:27017",
        "database": "myDatabase",
        "collection": "myCollection",
        "change.stream.full.document": "updateLookup",
        "errors.log.enable": true
    }
}

The message created in the Kafka topic by this Source connector is as follows (notice the leading control character):

�{"_id": {"_data": "82609E8726000000012B022C0100296E5A1004BE208B099BCF4106822DE274B0B9D39A46645F69640064609E87267125D17D12D180620004"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1621002022, "i": 1}}, "fullDocument": {"_id": {"$oid": "609e87267125d17d12d18062"}, "uuid": "23534a5c-ad82-431c-a821-6b4aed4f59a1", "endingNumber": 10}, "ns": {"db": "myDatabase", "coll": "myCollection"}, "documentKey": {"_id": {"$oid": "609e87267125d17d12d18062"}}}

The control character makes downstream parsing to JSON difficult because it makes the otherwise valid JSON invalid. I don't know why it's there or how to get rid of it.

I could, I guess, parse out junk like this control character prior to treating it like JSON but that seems like a band-aid I'd like to avoid.

The way I'm treating the message now, which I think is irrelevant since I've tested that it works with valid JSON without the control character, is as follows in case it matters:


data class MyChangesetMessageId (
    @JsonProperty("_data")
    val data: String
)

data class MyChangesetMessageTimestamp (
    val t: Long,
    val i: Int
)

data class MyChangesetMessageClusterTime (
    @JsonProperty("\$timestamp")
    val timestamp: MyChangesetMessageTimestamp
)

data class MyChangesetOid (
    @JsonProperty("\$oid")
    val oid: String
)

data class MyChangesetMessageFullDocument (
    @JsonProperty("_id")
    val id: MyChangesetOid,
    val uuid: String,
    val endingNumber: Int
)

data class MyChangesetMessageNS (
    val db: String,
    val coll: String
)

data class MyChangesetDocumentKey (
    @JsonProperty("_id")
    val id: MyChangesetOid
)

data class MyChangesetMessage (
    @JsonProperty("_id")
    val id: MyChangesetMessageId,
    val operationType: String,
    val clusterTime: MyChangesetMessageClusterTime,
    val fullDocument: MyChangesetMessageFullDocument,
    val ns: MyChangesetMessageNS,
    val documentKey: MyChangesetDocumentKey
)

...

val objectMapper = jacksonObjectMapper()
val changesetMessage = objectMapper.readValue(message, MyChangesetMessage::class.java)

Any ideas are appreciated.


Solution

  • The character you're referring to is typically common with Avro serialized data that's been decoded into a string.

    Check the key/value converter settings in the Connect worker since you've not defined it in the Connector.

    If you want to parse to JSON, use the JSONConverter, otherwise Avro would work as well if you want to skip data class definitions and generate that from the Avro schema