Search code examples
jdbcapache-kafkaapache-kafka-connectkafkajs

Stream from MSK to RDS PostgreSQL with MSK Connector


I've been going in circles with this for a few days now. I'm sending data to Kafka using kafkajs. Each time I produce a message, I assign a UUID to the message.key value, and the the message.value is set to an event like this and then stringified:

// the producer is written in typescript
const event = {
    eventtype: "event1",
    eventversion: "1.0.1",
    sourceurl: "https://some-url.com/source"
};
// stringified because the kafkajs producer only accepts `string` or `Buffer`
const stringifiedEvent = JSON.stringify(event);

I start my connect-standalone JDBC Sink Connector with the following configurations:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://postgres:5432/eventservice
connection.password=postgres
connection.user=postgres

auto.create=true
auto.evolve=true
topics=topic1
tasks.max=1
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
value.converter.schema.registry.url=http://schema-registry:8081 

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=localhost:9092
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

When I start the connector with connect-standalone worker.properties connect-standalone.properties, it spins up and connects to PostgreSQL with no issue. However, when I produce an event, it fails with this error message:

WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records 
with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:609)

With this stack trace:

org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with 
'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and 
non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254) 
with a HashMap value and null value schema.
    at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
    at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

I've been going back and forth trying to get it to read my messages, but I'm not sure what is going wrong. One solution just leads to another error, and the solution for the new error leads back to the previous error. What is the correct configuration? How do I resolve this?


Solution

  • In order to read and parse JSON data then subsequently write it to PostgreSQL, you are required to pass a schema. First of all, you need to change your Kafka message value structure:

    {
      "schema": { "..." }
      "payload": { "..." }
    };
    

    Where the schema is a JSON schema that defines what the payload would contain. For example, the top-level field schema would contain the following:

    {
      "type": "struct",
      "name": "schema-name",
      "fields": [
        { "type": "string", "optional" false, "field": "source" },
        { "type": "string", "optional" false, "field": "message" }
      ]
    }
    

    The payload top-level field would then contain something similar to the following:

    {
      "source": "https://example.com/",
      "message": "establishing connection..."
    }
    

    You can then pass this to your kafkajs producer:

    producer.send({
      topic: "topic1",
      messages: [
        { 
          key: "key", 
          value: {
            schema: {
              type: "struct",
              name: "schema-name",
              fields: [
                { type: "string", optional false, field: "source" },
                { type: "string", optional false, field: "message" }
              ]
            },
            payload: {
              source: "https://example.com/",
              message: "establishing connection..."
            }
          }
        }
      ]  
    });
    

    Now that the message is configured, you need to make these changes to your worker.properties file:

    # key converters are just for strings, because the Kafka message key is a string
    key.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    
    # values are in JSON, and a schema is passed, so "schemas.enable" must be "true"
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=true
    

    And you need to make these changes to your connect-standalone.properties file:

    # this must be insert, because "upsert" requires that a 
    # primary key be provided by the message, either by the Kafka 
    # message value or key
    insert.mode=insert
    
    # pk.mode=none because you are not writing a primary key 
    # using the sink connector - each record generates a serial PK value
    pk.mode=none
    
    # delete.enabled=false because we are not treating null fields as deletes
    delete.enabled=false
    

    Make those changes, and you're configuration files will look like the following:

    # connect-standalone.properties
    name=local-jdbc-sink-connector
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    dialect.name=PostgreSqlDatabaseDialect
    connection.url=jdbc:postgresql://<host>:<port>/<database>
    connection.password=<password>
    connection.user=<user>
    auto.create=true
    auto.evolve=true
    topics=<topics>
    tasks.max=1
    insert.mode=insert
    delete.enabled=false
    pk.mode=none
    consumer.auto.offset.reset=latest
    
    # worker.properties
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=true
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    
    bootstrap.servers=<brokers>
    group.id=jdbc-sink-connector-worker
    worker.id=jdbc-sink-worker-1
    consumer.auto.offset.reset=latest
    
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=1
    config.storage.topic=connect-configs
    config.storage.replication.factor=1
    status.storage.topic=connect-status
    status.storage.replication.factor=1
    

    This will enabled your producer to send to your cluster, and the Sink Connector will be able to read and parse your JSON values and write them to Postgres. For added detail, your database table would look like this:

    CREATE TABLE IF NOT EXISTS table1(
      id SERIAL PRIMARY KEY,
      origin varchar(132),
      message text
    );