Search code examples
amazon-s3apache-kafkaapache-kafka-connectconfluent-platform

Force Confluent s3 sink to flush


I setup kafka connect s3 sink, duration set to 1 hour, and also I setup a rather big flush count, say 10,000. Now if there is not many message in the kafka channel, s3 sink will try to buffer them in memory, and wait it to accumulate to the flush count, then upload them together and commit the offset to its own consumer group.

But think of this situation. If in the channel, I only send 5,000 messages. Then there is no s3 sink flush. Then after a long time, the 5,000 message will eventually be evicted from kafka because of the retention time. But these messages are still in s3 sink's memory, not in s3. This is very dangerous, for example, if we restarted s3 sink or the machine running s3 sink just crashes. Then we lost those 5,000 messages. We cannot find them again from kafka because it is already deleted.

Will this happen to s3 sink? Or there is some settings that force it to flush after sometime?


Solution

  • If your stream from Kafka to S3 does not have constant flow of records, you may use the property

    rotate.schedule.interval.ms

    to flush records in scheduled intervals.

    Note, that in the case of reprocessing your downstream system should be able to cope with duplicates if this option is used. That is because flushing such records based on wall-clock might result in duplicates appearing in different files if the connector is scheduled to re-export records from Kafka.

    As a sidenote, if you use property:

    rotate.interval.ms

    with the Wallclock timestamp extractor (timestamp.extractor=Wallclock), your records will be flushed without setting the rotate.schedule.interval.ms. But this means that your partitioner is dependent on wall-clock and therefore you should be able to account for duplicate records.

    The connector is able to offer exactly-once delivery on a constant stream of records with deterministic partitioners and has various timestamp extractors, such as one that depends on the record's timestamp (Record) or of a field timestamp (RecordField) .

    Configuration properties for partitioning here