I've been going in circles with this for a few days now. I'm sending data to Kafka using kafkajs. Each time I produce a message, I assign a UUID to the message.key value, and the the message.value is set to an event like this and then stringified:
// the producer is written in typescript
const event = {
eventtype: "event1",
eventversion: "1.0.1",
sourceurl: "https://some-url.com/source"
};
// stringified because the kafkajs producer only accepts `string` or `Buffer`
const stringifiedEvent = JSON.stringify(event);
I start my connect-standalone JDBC Sink Connector with the following configurations:
# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://postgres:5432/eventservice
connection.password=postgres
connection.user=postgres
auto.create=true
auto.evolve=true
topics=topic1
tasks.max=1
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
value.converter.schema.registry.url=http://schema-registry:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
bootstrap.servers=localhost:9092
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
When I start the connector with connect-standalone worker.properties connect-standalone.properties, it spins up and connects to PostgreSQL with no issue. However, when I produce an event, it fails with this error message:
WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception.
Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records
with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema.
(org.apache.kafka.connect.runtime.WorkerSinkTask:609)
With this stack trace:
org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with
'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and
non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254)
with a HashMap value and null value schema.
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
I've been going back and forth trying to get it to read my messages, but I'm not sure what is going wrong. One solution just leads to another error, and the solution for the new error leads back to the previous error. What is the correct configuration? How do I resolve this?
In order to read and parse JSON data then subsequently write it to PostgreSQL, you are required to pass a schema. First of all, you need to change your Kafka message value structure:
{
"schema": { "..." }
"payload": { "..." }
};
Where the schema is a JSON schema that defines what the payload would contain. For example, the top-level field schema
would contain the following:
{
"type": "struct",
"name": "schema-name",
"fields": [
{ "type": "string", "optional" false, "field": "source" },
{ "type": "string", "optional" false, "field": "message" }
]
}
The payload
top-level field would then contain something similar to the following:
{
"source": "https://example.com/",
"message": "establishing connection..."
}
You can then pass this to your kafkajs
producer:
producer.send({
topic: "topic1",
messages: [
{
key: "key",
value: {
schema: {
type: "struct",
name: "schema-name",
fields: [
{ type: "string", optional false, field: "source" },
{ type: "string", optional false, field: "message" }
]
},
payload: {
source: "https://example.com/",
message: "establishing connection..."
}
}
}
]
});
Now that the message is configured, you need to make these changes to your worker.properties
file:
# key converters are just for strings, because the Kafka message key is a string
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
# values are in JSON, and a schema is passed, so "schemas.enable" must be "true"
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
And you need to make these changes to your connect-standalone.properties
file:
# this must be insert, because "upsert" requires that a
# primary key be provided by the message, either by the Kafka
# message value or key
insert.mode=insert
# pk.mode=none because you are not writing a primary key
# using the sink connector - each record generates a serial PK value
pk.mode=none
# delete.enabled=false because we are not treating null fields as deletes
delete.enabled=false
Make those changes, and you're configuration files will look like the following:
# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://<host>:<port>/<database>
connection.password=<password>
connection.user=<user>
auto.create=true
auto.evolve=true
topics=<topics>
tasks.max=1
insert.mode=insert
delete.enabled=false
pk.mode=none
consumer.auto.offset.reset=latest
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
bootstrap.servers=<brokers>
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1
consumer.auto.offset.reset=latest
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
This will enabled your producer to send to your cluster, and the Sink Connector will be able to read and parse your JSON values and write them to Postgres. For added detail, your database table would look like this:
CREATE TABLE IF NOT EXISTS table1(
id SERIAL PRIMARY KEY,
origin varchar(132),
message text
);