Search code examples
mongodbapache-kafkaapache-kafka-connectmongodb-kafka-connector

MongoDB Sink Connector : Message truncated in Apache Kafka


I'm facing a problem with the MongoDB Kafka Connector. I'm trying to produce a json message from console producer (and console consumer) into a Kafka Topic.

When the message is less size is less than 4096 Bytes, it is consumed properly. But when the message is larger than 4096 Bytes, I get this exception :

ERROR WorkerSinkTask{id=scraper-mongo-sink-0} Error converting message value in topic 'rawdata' partition 0 at offset 154 and timestamp 1636471830852: Converting byte[] to Kafka Connect data failed due to serialization error:  (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
.......
.......

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
 at [Source: (byte[])"{ ........."[truncated 3595 bytes]; line: 1, column: 4096]
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
 at [Source: (byte[])"{ "....[truncated 3595 bytes]; line: 1, column: 4096]

Does anyone have a clue what is causing this error ? And more importantly, how to solve this issue ?

NB. I've tried modifying some default properties of the broker, as well as producer/consumer such as : offset.metadata.max.bytes, max.request.size, message.max.bytes, fetch.max.bytes n, etc.

Please anyone help


Solution

  • You could try using kcat tool instead, but I vaguely remember coming across this issue before and I probably looked at the source code, but in any case, the alternative is to use input redirection from the shell rather than type (or paste) that much data. (If you are pasting, then the problem is either the clipboard, or the terminal, not Kafka)

    kafka-console-producer ... < data.json 
    

    Make sure there is one JSON object/array/value per line