Search code examples
apache-kafkaapache-kafka-streamsapache-kafka-connectconfluent-platformksqldb

KSQLDB - Getting data from debezium cdc source connector and joining Stream with Table


folks.

Let me introduce the scenario first:

I'm getting data from two tables in a MS SQL SERVER by using Debezium CDC Source Connector. Follow the connectors configs:

Connector for PROVIDER table:

CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_PROVIDER WITH (

    'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector', 
    'database.hostname'= '<URL>',
    'database.port'= '1433',
    'database.user'= '<USER>',
    'database.password'= '<PASS>',
    'database.dbname'= 'a',
    'database.server.name'= 'a',
    'table.whitelist'='dbo.PROVIDER',
    'decimal.handling.mode'='double',
    'transforms'= 'unwrap,addTopicPrefix',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
    'transforms.addTopicPrefix.regex'='(.*)',
    'transforms.addTopicPrefix.replacement'='mssql-01-$1',
    'database.history.kafka.bootstrap.servers'= 'kafka:29092', 
    'database.history.kafka.topic'= 'dbhistory.PROVIDER' 
    );

Connector for ORDERS table:

CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_ORDER WITH (
    
    'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector', 
    'database.hostname'= '<URL>',
    'database.port'= '1433',
    'database.user'= '<USER>',
    'database.password'= '<PASS>',
    'database.dbname'= 'a',
    'database.server.name'= 'a',
    'table.whitelist'='dbo.ORDER',
    'decimal.handling.mode'='double',
    'transforms'= 'unwrap,addTopicPrefix',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
    'transforms.addTopicPrefix.regex'='(.*)',
    'transforms.addTopicPrefix.replacement'='mssql-01-$1',
    'database.history.kafka.bootstrap.servers'= 'kafka:29092', 
    'database.history.kafka.topic'= 'dbhistory.ORDER'
    );

I think it could be improved, but it fine for now.

Once connectors are set we can create our stream and table:

CREATE TABLE PROVIDER (ID_P VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='mssql-01-a.dbo.PROVIDER', VALUE_FORMAT='AVRO');

CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql-01 a.dbo.ORDERS',VALUE_FORMAT='AVRO');

As you can see, now it just enrich ORDERS stream with data from PROVIDER table, right? Yes, but not.

SELECT P.PROVIDER_COD, O.ID FROM ORDERS AS O JOIN PROVIDER AS P ON O.PROV = P.PROVIDER_COD EMIT CHANGES;

If I try to do that I get an error:

Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE's key column ID_P instead of [PROVIDER_COD]

Well, its supposed to be pretty easy to fix, but not in this case. And finally we get to my problem:

The Provider's id is not in ORDERS stream, because that is how the database I'm getting data from is designed.

How could we relate the two datasets?

If it was a relational database it would be easy:

SELECT * FROM ORDERS O INNER JOIN PROVIDER P ON O.PROV = P.PROVIDER_COD AND O.SUB_COD = P.SUB_COD;

Yes... I did not mention it before, but we have a composite key here, Provider Code and Provider' Subsidiary Code, which I believe is another issue.

Please, can anyone help me understand how to solve this in KSQLDB?

Thanks a lot.


Solution

  • I've found a solution for this on Confluent Forum.

    https://forum.confluent.io/t/join-stream-table-order-provider-when-the-stream-does-not-have-the-providers-id-but-others-informations/2279/7

    Thanks to Matthias J. Sax