Search code examples
apache-kafkarabbitmqapache-kafka-connectconfluent-platform

Kafka Connect issue when reading from a RabbitMQ queue


I'm trying to read data into my topic from a RabbitMQ queue using the Kafka connector with the configuration below:

{
  "name" : "RabbitMQSourceConnector1",
  "config" : {
   "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
   "tasks.max" : "1",
   "kafka.topic" : "rabbitmqtest3",
   "rabbitmq.queue" : "taskqueue",
    "rabbitmq.host" : "localhost",
    "rabbitmq.username" : "guest",
    "rabbitmq.password" : "guest",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true"

  }
}

But I´m having troubles when converting the source stream to JSON format as I´m losing the original message

Original:

{'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}

Received:

{"schema":{"type":"bytes","optional":false},"payload":"eyJpZCI6IDEsICJib2R5IjogIjAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMCJ9"}

Does anyone have an idea why this is happening?

EDIT: I tried to convert the message to String using the "value.converter": "org.apache.kafka.connect.storage.StringConverter", but the result is the same:

11/27/19 4:07:37 PM CET , 0 , [B@1583a488

EDIT2:

I´m now receiving the JSON file but the content is still encoded in BASE64

Any idea on how to convert it back to UTF8 directly?

{
 "name": "adls-gen2-sink",
 "config": {
   "connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
   "tasks.max":"1",
   "topics":"rabbitmqtest3",
   "flush.size":"3",
   "format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
   "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
   "internal.value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
   "topics.dir":"sw66jsoningest",
   "confluent.topic.bootstrap.servers":"localhost:9092",
   "confluent.topic.replication.factor":"1",
   "partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner"

          }
}

UPDATE:

I got the solution, considering this flow:

Message (JSON) --> RabbitMq (ByteArray) --> Kafka (ByteArray) -->ADLS (JSON)

I used this converter on the RabbitMQ to Kafka connector to decode the message from Base64 to UTF8.

"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"

Afterwards I treated the message as a String and saved it as a JSON.

"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",

Solution

  • If you set schemas.enable": "false", you shouldn't be getting the schema and payload fields

    If you want no translation to happen at all, use ByteArrayConverter

    If your data is just a plain string (which includes JSON), use StringConverter

    It's not clear how you're printing the resulting message, but looks like you're printing the byte array and not decoding it to a String