Search code examples
jdbcapache-pulsarsink

Apache Pulsar JDBC sink: differentiation between insert/update/delete


I'm currently examining Pulsar JDBC sinks, as we plan to use a PostgresSQL sink soon. Now, it's mentioned that JDBC sinks support insert/update/delete ops, but I wasn't able to find any documentation on HOW the sink connector actually decides on WHAT to execute (is it an insert, an update or a delete for a new event?)

After browsing the source code and ogling into JdbcAbstractSink.java I think I might have an idea now, but I need some confirmation if my idea is right.

Please tell me if this is correct:

1.) There need to be 3 different topics for 1 db entity type. One topic for inserting the entity-type into a table, one for updating same entity-type, one for deletions. Also there need to be 3 different sink connectors, each one having a different configuration.

2.) The command decision is made by configuration properties:

  • if both nonKey and key properties are missing --> insert is executed

  • if both nonKey and key props are provided --> update is executed, as in

    update nonKey columns where key column(s) = event.value

  • if only key columns are provided -->

    delete where key column = event.value

Is this the way it's done?

In mentioned source code class there's the a code bit

 for (Record<T> record : swapList) {
                String action = record.getProperties().get(ACTION);
                if (action == null) {
                    action = INSERT;
                }
                switch (action) {
                    case DELETE: ...
                    case UPDATE: ...

but nowhere is mentioned where and how the ACTION property of the record is set...

If I just missed the relevant documentation somehow, it would be nice to provide me a link. I know about this configuration doc page: https://pulsar.apache.org/docs/en/io-jdbc-sink/#configuration but it's very vague and there are no real examples


Solution

  • The documentation for this connect is lacking to say the least, so I will do my best to explain it. As you can see from the code, the "action" to take, e.g. insert, update, or delete is passed in as a property inside the Pulsar message itself.

    String action = record.getProperties().get(ACTION);
    

    Therefore in order to control the action taken by the Sink, you need to add that property to the message that you publish in the "source" topic of the JDBC Sink connector (unless you want the action to be INSERT, which is the default action).

    Here is an example of how to publish a message with a different action in message properties:

    producer.newMessage().value("1234").property("action", "delete").send();
    

    Now when the JDBC Sink connector reads this message, it will perform a DELETE operation on the record with the primary key value of "1234".