We've a SQL Flink Job (Table API) that reads Offers from a Kafka topic (8 partitions) as source and sinks it back to another Kafka topic after some aggregations with other data sources to calculate the cheapest one and aggregate extra data over that result.
Sink looks like this:
CREATE TABLE cheapest_item_offer (
`id_offer` VARCHAR(36),
`id_item` VARCHAR(36),
`price` DECIMAL(13,2),
-- ... more offer fields
PRIMARY KEY (`id_item`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '<TOPIC_NAME>',
'properties.bootstrap.servers' = '<KAFKA_BOOTSTRAP_SERVERS>',
'properties.group.id' = '<JOBNAME>',
'sink.buffer-flush.interval' = '1000',
'sink.buffer-flush.max-rows' = '100',
'key.format' = 'json',
'value.format' = 'json'
);
And the upsert looks like this:
INSERT INTO cheapest_item_offer
WITH offers_with_stock_ordered_by_price AS (
SELECT *,
ROW_NUMBER() OVER(
PARTITION BY id_item
ORDER BY price ASC
) AS n_row
FROM offer
WHERE quantity > 0
), cheapest_offer AS (
SELECT offer.*
FROM offers_with_stock_ordered_by_price offer
WHERE offer.n_row = 1
)
SELECT id_offer,
id_item,
price,
-- ... item extra fields
FROM cheapest_offer
-- ... extra JOINS here to aggregate more item data
Given this configuration, the job initially ingests the data and calculates it properly, and sets the cheapest offer right, but after some time passes, whenever there are some events in our data source they are unexpectedly resulting in a Tombstone (not always though, sometimes it's properly set) result which, after checking them, we notice they shouldn't be, mainly because there's an actual cheapest offer for that item and the related JOIN rows do exists.
The following images illustrate the issue with some Kafka messages:
Data source
This is the data source we ingest the data from. The latest for a given Item update shows that an Offer has some changes.
Data Sink
This is the data Sink for the same Item, as we can see, the latest update was generated at the same time, because of the data source update, but the resulting value is a Tombstone, rather than its actual value from the data source
If we relaunch the Job from scratch (ignoring savepoints), the affected Items are fixed on the first run, but the same issue will appears after some time.
Some considerations:
upsert-kafka
connector in Source & Sinkrocksdb
as state.backend
We're struggling to find the reason behind this behavior (we're pretty new to Flink), and we don't know where to focus for fixing this, can anybody help here?
Any suggestion will be highly appreciated!
Apparently it was a bug from Flink SQL on v1.13.2 as noted in Flink's Jira Task FLINK-25559.
We managed to solve this issue by upgrading version to v1.13.6.