Search code examples
apache-kafkaapache-kafka-connectdebezium

Debezium connector task is in unassigned state


Today one of the nodes out of 3 went out of sync and was brought back up. Now when I check the status of the connector task it shows as UNASSIGNED even though the connector is in a RUNNING state. The workers are running in distributed mode.

I tried to restart the connector, but it's still UNASSIGNED and points to the same worker node which was brought back into the cluster.

Following is my properties file of one of the worker which is same across all workers:

bootstrap.servers=something:9092
group.id=ffb-supply-kafka-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=supply-kafka-connect-offsets
config.storage.topic=supply-kafka-connect-configs
status.storage.topic=supply-kafka-connect-status
plugin.path=/var/lib/3p-kafka/connect-plugins/,/usr/share/3p-kafka/libs

Connector config:

  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "minimal",
  "transforms.insertKey.fields": "external_shipment_id",
  "tasks.max": "1",
  "database.history.kafka.topic": "seller_oms_log_history_20220304200010",
  "transforms": "unwrap,insertKey,extractKey,alterKey",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "include.schema.changes": "false",
  "table.whitelist": "seller_oms.shipment_log",
  "database.history.kafka.recovery.poll.interval.ms": "5000",
  "transforms.unwrap.drop.tombstones": "false",
  "database.history.skip.unparseable.ddl": "true",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.whitelist": "seller_oms",
  "transforms.alterKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.user": "cdc_user",
  "transforms.extractKey.field": "external_shipment_id",
  "database.server.id": "20220304200010",
  "transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "database.history.kafka.bootstrap.servers": "XX:9092,YY:9092,ZZ:9092",
  "transforms.alterKey.renames": "__source_ts_ms:updated_by_debezium_at",
  "database.server.name": "seller_oms_log",
  "heartbeat.interval.ms": "5000",
  "database.port": "3306",
  "key.converter.schemas.enable": "false",
  "database.hostname": "master.in",
  "database.password": "XXYYSS",
  "value.converter.schemas.enable": "false",
  "name": "shipment-log-connector-20220304200010",
  "errors.tolerance": "all",
  "transforms.unwrap.add.fields": "source.ts_ms",
  "snapshot.mode": "schema_only"

I created a new connector with same configs and it works fine on the same worker node where it was failing for existing connector. I am not sure why the old pipeline didn't come up and got into RUNNING state for task. Is there something that needs to be done when node gets disconnected from the cluster, how to resume that connector once it is back? As per my understanding, if one worker goes down, it should Automatically assign the task to another worker.


Solution

  • resume that connector once it is back?

    After you run the connect-distributed script, it should rebalance the tasks.

    Otherwise, there's a /restart API endpoint

    if one worker goes down, it should Automatically assign the task to another worker

    In my experience, when tasks actually fail, they remain failed, and the restart endpoint needs hit, if it's a temp failure and the logs don't show anything useful. However, your errors.tolerance setting may help isolate the problem somewhat