Search code examples
kubernetesapache-kafkaapache-kafka-connects3-kafka-connector

Configure s3 sink on kafka-connect for kubernetes with TimeBasedPartitioner to increase throughput


I have kafka-connect running on kubernetes configured with KAFKA_HEAP_OPTS: "-Xmx1G -Xms1G" and resources:

requests:
  cpu: 200m
  memory: 2Gi

I need to ingest lot of legacy data from the last 3 year containing around 300M records (~50Go).

In parallel, I continue to ingest "live" data at around 20 msg/s.

My topics are configured with 12 partitions each.

My kafka-connect configuration for the s3 sink is:

{
  "name": "s3-sink",
  "tasks.max": "2",
  "aws.access.key.id": "<key>",
  "aws.secret.access.key": "<secret>",
  "s3.bucket.name": "bucket",
  "s3.compression.type": "gzip",
  "s3.elastic.buffer.enable": "true",
  "s3.part.size": "5242880",
  "s3.region": "<region>",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
  "partition.duration.ms": "3600000",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "time"
  "locale": "en_US",
  "timezone": "UTC",
  "flush.size": "1000",
  "rotate.interval.ms": "-1",
  "rotate.schedule.interval.ms": "1000",
  "schema.compatibility": "NONE",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "store.url": "<s3-url>",
  "topics.dir": "raw",
  "topics.regex": "raw",
  "value.converter.schemas.enable": "false",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
}

With this configuration I can process around 500 msg/s but I have from time to time OOM. At this rate it will take more than 5 days to process all messages.

Furthermore, I would like to sink to s3 other topics in the same time.

How can I improve this connector configuration to avoid out of memory error ( java.lang.OutOfMemoryError: Java heap space) and increase throughput?

I tried to increase number of task but I reach OOM quickly...

Scaling number of pods didn't lead to concluent results as the memory needed will be too big.


Solution

  • If you have 12 partitions, then use "tasks.max": "12"

    As of Java 8u292, you should not be using Xmx Xms values. Use -XX:MaxRAMPercentage set between 80-90% to allow the container to use the full requested value of the pod cgroup.

    other topics in the same time

    Make unique connector configs for them. But keep in mind, that all connectors are sharing the same JVM, so you may just need to run more and more replicas of the Connect pods.

    the memory needed will be too big

    Then scale up your k8s cluster. That's not a problem that Connect can address.