I'm trying to sync my PostgreSQL db to elasticsearch. But encountering some difficulties with the deletion of records.
Here's some information about what I'm trying to achieve.
Here's my source config :
{
"name": "pg-source-1",
"config": {
"slot.name" : "debezium",
"database.server.name": "cdc",
"slot.drop_on_stop": true,
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"schema.whitelist": "my_schema",
"override.message.max.bytes": "524288000",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"database.history.kafka.topic": "schema-changes.my_schema
}
}
So right now I have 3 table, product (PK id), product_category (only FKs, one references to product.id and other to category.id), category (PK id) Here product_category is a bridging table. And when I delete some relationship between product and categories i.e. a record in product_category table. It's not reflecting on the ES side.
These are the things that do work:
Here's ES sink connector config:
{
"name": "es-sink-1",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "output-topic",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.ignore": false,
"transforms": "extract",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id",
"transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract.field": "id",
"behavior.on.null.values" : "delete"
}
}
Here's the configuration of Streams application.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application-1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
I followed this link.
Let me know if any more info is needed.
Thanks :)
Was able to fix this after looking more into the documentation of debezium.
For a consumer to be able to process a delete event generated for a table that does not have a primary key, set the table’s REPLICA IDENTITY to FULL. When a table does not have a primary key and the table’s REPLICA IDENTITY is set to DEFAULT or NOTHING, a delete event has no before field.
So I just had to change the replica identity to FULL for the bridging table since it had no PKs only FKs.
There was also one more config change that I made in the above config :
"transforms.unwrap.drop.tombstones":false,
After this I started receiving events with keys (for deleted records) and null values, just had to honor that in streams application.