Search code examples
apache-kafkaconfluent-platforms3-kafka-connector

Why does my Kafka S3 Connector Sink delete my topic after creation (Kafka Connector Restarts)?


I'm using Confluent's Kafka Connectors to sink data from Kafka to a MinIO bucket. I'm using io.confluent.connect.s3.S3SinkConnector inside a Kubernetes environment. This is my current S3 Sink configuration:

kafkaConnectSpec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
     tasks.max: 1
     topics: topic-name
     s3.bucket.name: bucket
     flush.size: 1
     store.url: http://minio:9000
     format.class: io.confluent.connect.s3.format.json.JsonFormat
     storage.class: io.confluent.connect.s3.storage.S3Storage

After the cloud environment deploys, the customer wants to be able to control topics dynamically (ie. adding and deleting topics at will). While I understand why this might not be ideal, I yield to the higher authorities.

So in order to perform a topic addition, I'm using the Kafka REST API:

def update_sink(topic, connector):
 configuration = requests.get("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector)).json()

    if "config" not in configuration:
        return {
            "status": 500,
            "message": "Kafka Sink " + str(connector) + " does not have a configuration"
        }

    # Topics must be comma delimited
    if "topics" in configuration["config"]:
        configuration["config"]["topics"] += ',' + topic

    requests.put("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector) + "/config", json=configuration["config"])
    print("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector) + "/config")
    print(configuration["config"])
    return {
        "status": 200,
        "message": "Kafka Sink " + str(connector) + " successfully updated"
    }

I know the code is not pretty, but it gets the job done for now. It essentially makes a PUT request to the /connectors/my-sink/config endpoint with my new topics appended.

This works. My sink has the new topic and I can send messages.

However, within 3-5 minutes, my Kafka Sink Pod begins restarting (I think) the Kafka connector:

2021-03-19 23:02:55,086 INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector minio-s3-sink config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [KafkaBasedLog Work Thread - connect-cluster-configs]
2021-03-19 23:02:55,589 INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling connector-only config update by restarting connector minio-s3-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,589 INFO Stopping connector minio-s3-sink (org.apache.kafka.connect.runtime.Worker) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,589 INFO Shutting down S3 connector minio-s3-sink (io.confluent.connect.s3.S3SinkConnector) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,598 INFO Stopped connector minio-s3-sink (org.apache.kafka.connect.runtime.Worker) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,598 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connector minio-s3-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
... # Performs the restart here

At which point the pod loses the topic.

I believe this is caused by the config.action.reload = restart configuration value. I think after receiving a new configuration the connector is slated for a restart after N-minutes. However, I can't seem to find any documentation on how to change that behavior. Perhaps I should be doing it during my PUT request but that feels hacky. It's also just a shot in the dark.

Does anyone know why my connector is restarting after the PUT request with the updated configuration? Is there anyway to prevent this?

Edit #1: I attempted to add config.action.reload = none, however the connector still restarted.

I watched the logs in the Kafka Operator and it did not trigger the reset. Seems to be entirely isolated to the Kafka Connector Operator.


Solution

  • The problem was documented here in Strimzi:

    If KafkaConnectors are enabled, manual changes made directly using the Kafka Connect REST API are reverted by the Cluster Operator

    https://strimzi.io/docs/operators/latest/deploying.html#availability_of_the_kafka_connect_rest_api

    I had no idea this was happening in my deployment but apparently we have to turn it off. Which is unfortunate since the K8 connector deployment was nice for a simple start.

    This is the relevant configuration to turn them "off":

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnect
    metadata:
      name: kafka-connect-cluster
      annotations:
    #  # use-connector-resources configures this KafkaConnect
    #  # to use KafkaConnector resources to avoid
    #  # needing to call the Connect REST API directly
        strimzi.io/use-connector-resources: "false"
    

    The strimzi.io/use-connector-resources: "false" will make it so you cannot add connectors via your YAML file, but you can add the connector via your REST API (and those changes will persist as long as the pod runs)