Search code examples
apache-kafkaksqldb

Is it possible to create a stream in ksqlDB to emit an ID that detects changes from 3 different tables that are joined together?


Is it possible to create a stream that detects changes on 3 different tables? For example, I have Table A which contains Ids for Table B and Table C. If I constructed my join query correctly. could I emit an event that contains Table A's id if there was a change in Table B or C?

Table A

  • id
  • b_id
  • c_id
  • field_abc
  • field_xyz

Table B

  • id
  • foo

Table C

  • id
  • bar

I want a stream that will emit Table A id's if there is any changes in any of those 3 tables. Is this possible?

For example, if fields field_abc, foo, or bar were to change, I want Table A's id to be emitted to a stream.


Solution

  • I recently ran into a similar issue as what you're describing. Currently this isn't possible using streams or tables due to limitations on ksqlDB. We did find a way to achieve the same results though.

    Our solution was to create a custom query with the connector that creates a 3-way joined table and combines the updated fields on the 3 tables.

    CREATE SOURCE CONNECTOR xyz_change WITH (
        'connector.class'          = '${v_connector_class}',
        'connection.url'           = '${v_connection_url}',
        'connection.user'          = '${v_connection_user}',
        'connection.password'      = '${v_connection_pass}',
        'topic.prefix'             = 'jdbc_abc_change',
        'mode'                     = 'timestamp+incrementing',
        'numeric.mapping'          = 'best_fit',
        'incrementing.column.name' = 'id',
        'timestamp.column.name'    = 'last_modified',
        'key'                      = 'id',
        'key.converter'            = '${v_converter_long}',
        'query'                    = 'select id, last_modified from(select a.id as id, GREATEST(a.last_modified, COALESCE(b.last_modified,from_unixtime(0)), COALESCE(c.last_modified,from_unixtime(0))) as last_modified  from aaa a  LEFT JOIN bbb b on a.fk_id = b.id  LEFT JOIN ccc c on a.fk_id = c.id ) sub'
    );
    

    With this you're able to create any streams/tables you need off of it.