Search code examples
apache-kafkaapache-kafka-connects3-kafka-connector

Is there a way in the S3 Kafka sink connector to ensure all records are consumed


I have a problem in the S3 Kafka connector but also seen this in the JDBC connector. I'm trying to see how can I ensure that my connectors are actually consuming all the data in a certain topic. I expect because of the flush sizes that there could be a certain delay (10/15 minutes) in the consumption of the messages but I notice that I end up having big delays (days...) and my consumers always have something in the lag on the offset

I was reading/viewing the post/video about this for example (mainly that comment): https://rmoff.net/2020/12/08/twelve-days-of-smt-day-1-insertfield-timestamp/ https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day1.adoc "flush.size of 16 is stupidly low, but if it’s too high you have to wait for your files to show up in S3 and I get bored waiting."

And it does mention there if the flush.size is bigger than the available records it can be that the records take time to be consumed but I never expected this to be more than a couple of minutes. How can I ensure that all records are consumed, and I would really like to avoid having flush.size = 1

Maybe this is just a miss-understanding on my part about the sink connectors but I do expect them to work as a normal consumer so I expect them to consume all data and that this kind of flush/batch sizes would work more based on the timeouts and for performance issues.

If anyone is interested this are my connector configuration

For S3 sink:

topics.regex: com.custom.obj_(.*)
storage.class: io.confluent.connect.s3.storage.S3Storage
s3.region: ${@S3_REGION@}
s3.bucket.name: ${@S3_BUCKET@}
topics.dir: ${@S3_OBJ_TOPICS_DIR@}
flush.size: 200
rotate.interval.ms: 20000
auto.register.schemas: false
s3.part.size: 5242880
parquet.codec: snappy
offset.flush.interval.ms: 20000
offset.flush.timeout.ms: 5000
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
format.class: com.custom.connect.s3.format.parquet.ParquetFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
timestamp.extractor: Record
locale: ${@S3_LOCALE@}
timezone: ${@S3_TIMEZONE@}
store.url: ${@S3_STORAGE_URL@}
connect.meta.data: false
transforms: kafkaMetaData,formatTs
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
transforms.formatTs.field: message_ts
transforms.formatTs.target.type: string
transforms.formatTs.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true

For JDBC sink:

topics.regex: com.custom.obj_(.*)
table.name.format: ${@PREFIX@}${topic}
batch.size: 200
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
connection.url: ${@DB_URL@}
connection.user: ${@DB_USER@}
connection.password: ${@DB_PASSWORD@}
auto.create: true
auto.evolve: true
db.timezone: ${@DB_TIMEZONE@}
quote.sql.identifiers: never
transforms: kafkaMetaData
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true

I've read this two already and still not sure: Kafka JDBC Sink Connector, insert values in batches https://github.com/confluentinc/kafka-connect-jdbc/issues/290

Also for example I've seen examples of people using (which I don't think it would help my use case) but I was wondering is this value defined per connector? I'm even a bit confused about the fact that in the documentation I always find the configuration without the consumer. but the examples I always find with consumer. so I guess this means that this is a generic property that applies both to consumers and producers?

consumer.max.interval.ms: 300000
consumer.max.poll.records: 200

Anyone has some good feedback?


Solution

  • Regarding the provided Kafka S3 sink connector configuration:

    topics.regex: com.custom.obj_(.*)
    storage.class: io.confluent.connect.s3.storage.S3Storage
    s3.region: ${@S3_REGION@}
    s3.bucket.name: ${@S3_BUCKET@}
    topics.dir: ${@S3_OBJ_TOPICS_DIR@}
    flush.size: 200
    rotate.interval.ms: 20000
    auto.register.schemas: false
    s3.part.size: 5242880
    parquet.codec: snappy
    offset.flush.interval.ms: 20000
    offset.flush.timeout.ms: 5000
    aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
    aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
    format.class: com.custom.connect.s3.format.parquet.ParquetFormat
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
    partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
    timestamp.extractor: Record
    locale: ${@S3_LOCALE@}
    timezone: ${@S3_TIMEZONE@}
    store.url: ${@S3_STORAGE_URL@}
    connect.meta.data: false
    transforms: kafkaMetaData,formatTs
    transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
    transforms.kafkaMetaData.offset.field: kafka_offset
    transforms.kafkaMetaData.partition.field: kafka_partition
    transforms.kafkaMetaData.timestamp.field: kafka_timestamp
    transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
    transforms.formatTs.field: message_ts
    transforms.formatTs.target.type: string
    transforms.formatTs.type:org.apache.kafka.connect.transforms.TimestampConverter$Value
    errors.tolerance: all
    errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
    errors.deadletterqueue.context.headers.enable: true
    

    There are configuration fields you can tweak to control the consumption\upload to S3 rate. Thus reducing the lag in Kafka offset you are seeing. Its best practice to use variables for the below fields in your configuration.

    From personal experience, the tweaks you can do are:

    1. Tweak flush.size

      flush.size: 800

    which is (as you stated):

    Maximum number of records: The connector’s flush.size configuration property specifies the maximum number of records that should be written to a single S3 object. There is no default for this setting.

    I would prefer bigger files and use the timing tweaks below to control consumption. Make sure your records are not too big or small to make rational files as a result of flush.size * RECORD_SIZE.

    1. Tweak rotate.interval.ms

      rotate.interval.ms: (i would delete this field, see rotate.schedule explanation below)

    which is:

    Maximum span of record time: The connector’s rotate.interval.ms specifies the maximum timespan in milliseconds a file can remain open and ready for additional records.

    1. Add field rotate.schedule.interval.ms:

      rotate.schedule.interval.ms 60000

    which is:

    Scheduled rotation: The connector’s rotate.schedule.interval.ms specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. Unlike with rotate.interval.ms, with scheduled rotation the timestamp for each file starts with the system time that the first record is written to the file. As long as a record is processed within the timespan specified by rotate.schedule.interval.ms, the record will be written to the file. As soon as a record is processed after the timespan for the current file, the file is flushed, uploaded to S3, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the record is written to the file. The commit will be performed at the scheduled time, regardless of the previous commit time or number of messages. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour. The default value -1 means that this feature is disabled.

    You use the default -1 which means disabling this rotation. This tweak will make the most difference as each task will consume more frequently.

    Regarding the second part of the question:

    You can gain observability by adding metrics to your kafka and connect using for example prometheus and grafana. Configuration guide below in sources.

    metrics

    Sources:

    Connect S3 sink

    kafka-monitoring-via-prometheus

    Connect S3 Sink config Docs