Search code examples
apache-kafkaleft-joinconfluent-platformksqldb

KSQL left join giving 'null' result even when data is present


I'm learning K-SQL/KSQL-DB and currently exploring joins. Below is the issue where I'm stuck.

I have 1 stream 'DRIVERSTREAMREPARTITIONEDKEYED' and one table 'COUNTRIES', below is their description.

ksql> describe DRIVERSTREAMREPARTITIONEDKEYED;
Name: DRIVERSTREAMREPARTITIONEDKEYED
 Field       | Type
--------------------------------------
 COUNTRYCODE | VARCHAR(STRING)  (key)
 NAME        | VARCHAR(STRING)
 RATING      | DOUBLE
--------------------------------------

ksql> describe countries;

Name                 : COUNTRIES
 Field       | Type
----------------------------------------------
 COUNTRYCODE | VARCHAR(STRING)  (primary key)
 COUNTRYNAME | VARCHAR(STRING)
----------------------------------------------

This is the sample data that they have,

ksql> select * from DRIVERSTREAMREPARTITIONEDKEYED emit changes;
+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|COUNTRYCODE                                  |NAME                                         |RATING                                       |
+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|SGP                                          |Suresh                                       |3.5                                          |
|IND                                          |Mahesh                                       |2.4                                          |

ksql> select * from countries emit changes;
+---------------------------------------------------------------------+---------------------------------------------------------------------+
|COUNTRYCODE                                                          |COUNTRYNAME                                                          |
+---------------------------------------------------------------------+---------------------------------------------------------------------+
|IND                                                                  |INDIA                                                                |
|SGP                                                                  |SINGAPORE                                                            |

I'm trying to do a 'left outer' join on them with the stream being on the left side, but below is the output I get,

select d.name,d.rating,c.COUNTRYNAME from DRIVERSTREAMREPARTITIONEDKEYED d left join countries c on d.COUNTRYCODE=c.COUNTRYCODE emit changes;
+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|NAME                                         |RATING                                       |COUNTRYNAME                                  |
+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|Suresh                                       |3.5                                          |null                                         |
|Mahesh                                       |2.4                                          |null                                         |

In ideal scenario I should get the data in 'COUNTRYNAME' column as the 'COUNTRYCODE' column in both stream and data have matching data.

I tried searching a lot but to no avail. I'm using 'Confluent Platform: 6.1.1'


Solution

  • For join to work it is our responsible to verify if the keys of both entities which are being joined lie in the same partition, KsqlDB can't verify whether the partitioning strategies are the same for both join inputs.

    In my case My 'Drivers' topic had 2 partitions on which I had created a stream 'DriversStream' which in turn also had 2 partitions, but the table 'Countries' which I wanted to Join it with had only 1 partition, due to this I 're-keyed' the 'DriversStream' and created another stream 'DRIVERSTREAMREPARTITIONEDKEYED' shown in the question.

    But the data of the table and the stream were not in the same partition hence the join was failing.

    I created another topic with 1 partition 'DRIVERINFO'.

     kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic DRIVERINFO
    

    Then created a stream over it 'DRIVERINFOSTREAM'.

     CREATE STREAM DRIVERINFOSTREAM (NAME STRING, RATING DOUBLE, COUNTRYCODE STRING) WITH (KAFKA_TOPIC='DRIVERINFO', VALUE_FORMAT='JSON');
    
    

    Finally joined it with 'COUNTRIES' table which finally worked.

    ksql> select d.name,d.rating,c.COUNTRYNAME from DRIVERINFOSTREAM d left join countries c on d.COUNTRYCODE=c.COUNTRYCODE EMIT CHANGES;
    +-------------------------------------------+-------------------------------------------+-------------------------------------------+
    |NAME                                       |RATING                                     |COUNTRYNAME                                |
    +-------------------------------------------+-------------------------------------------+-------------------------------------------+
    |Suresh                                     |2.4                                        |SINGAPORE                                  |
    |Mahesh                                     |3.6                                        |INDIA                                      |
    
    
    

    Refer to below links for details,

    KSQL join

    Partitioning data for Joins