Search code examples
pythonapache-kafkaapache-kafka-connectconfluent-kafka-python

Unable to get kafka connect redshift connector working


Following the question and suggestions here: Kafka JDBCSinkConnector Schema exception: JsonConverter with schemas.enable requires "schema" and "payload", I am trying to sink records into redshift using redshift connector and a producer written in python. Here is the connector config:

connector.class=io.confluent.connect.aws.redshift.RedshiftSinkConnector
aws.redshift.port=5439
confluent.topic.bootstrap.servers=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
tasks.max=2
topics=test_topic_3
aws.redshift.password=xxxxxxxxxxxxxxxxxxxxxxxxxxxx
aws.redshift.domain=xxxxxxxxxxxxxxxxxxxxxxxx.redshift.amazonaws.com
aws.redshift.database=xxxxxxxxxxxxxxxxxxx
confluent.topic.replication.factor=1
aws.redshift.user=xxxxxxxxxxxxxxxxxxxxxxxx
auto.create=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
pk.mode=kafka

The content in the schema file is as under:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"url"}],"optional":false,"name":"test_data"},"payload":{"id":12,"url":"some_url"}}

and the python code is:

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers="xxxxxxxxxxxxx",value_serializer=lambda v: json.dumps(v).encode('utf-8'))
with open("connector_test_schema.json", 'r') as file:
    read = file.read()
    for i in range(1):
        producer.send("test_topic_3", key='abc'.encode('utf-8'), value=read)
producer.close()

I still get the following error:

[Worker-0fbc0b18922b147e0] org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Is there something that's wrong with what I'm doing?


Solution

  • As suggested by @OneCricketeer and confirmed, encoding was done twice, causing it to fail.

    Solution: Only encode the string read from the JSON file

    value_serializer=lambda v: v.encode('utf-8')