Search code examples
apache-kafkakafka-producer-apiapache-kafka-connect

Kafka source connector is not pulling the record as expected when records are inserted in source topic from multiple sources


In one of my use case i am trying to create a pipeline

whenever i sent the message from custom partition, i sent the timestamp in milliseconds with LONG data type because in the schema, the timestamp column has been defined as long.

Code that i had earlier in custom partition:

Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

Display result before i sent the record:

date = Tue Mar 26 22:02:04 EDT 2019 , time in millis = 1553652124063

value inserted in timestamp column in table2:

3/27/2019 2:02:04.063000 AM

Since its taking UK timezone (i believe), i put temporary fix for time being to subtract 4 hours from the current timestamp so that i can match with USA EST timestamp.

Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

Display result:

date = Tue Mar 26 22:04:43 EDT 2019 , time in millis = 1553637883826

value inserted in timestamp column in table2:

3/26/2019 10:04:43.826000 PM

Please let me know if i am missing anything as i am not sure why this is happening when i sent message from custom partition.


Solution

  • Under the hood Jdbc Source Connector use following query:

    SELECT * FROM someTable
    WHERE
    someTimestampColumn < $endTimetampValue
    AND (
        (someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
        OR someTimestampColumn > $beginTimetampValue)
    ORDER BY someTimestampColumn, someIncrementalColumn ASC
    

    Summarizing: The query retrieve rows if their timestamp column's value is earlier the current timestamp and is later than last checked.

    Above parameters are:

    1. beginTimetampValue - value of timestamp column of last imported record
    2. endTimetampValue - current timestamp according to the Database
    3. lastIncrementedValue - value of incremental column of last imported record

    I think in your case Producer put to the Tables records with higher timestamp, than you later insert manually (using the query).

    When Jdbc Connector checks for new records to import to Kafka it skips them (because they don't fullfil someTimestampColumn < $endTimetampValue timestamp condition)

    You can also change log level to DEBUG and see what is going on in logs