Search code examples
apache-kafkaapache-kafka-connectgoogle-cloud-bigtablebigtable

Error With RowKey Definition on Confluent BigTable Sink Connector


I'm trying to use the BigTable Sink Connector from Confluent to read data from kafka and write it into my BigTable Instance, but I'm receiving the following message error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with RowKey definition: Row key definition was defined, but received, deserialized kafka key is not a struct. Unable to construct a row key.
    at io.confluent.connect.bigtable.client.RowKeyExtractor.getRowKey(RowKeyExtractor.java:69)
    at io.confluent.connect.bigtable.client.BufferedWriter.addWriteToBatch(BufferedWriter.java:84)
    at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:47)
    at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:99)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
    ... 10 more

The message producer, due to some technical limitations, will not be able to produce the messages with the key property and, because of that, I'm using some Transforms to get information from payload and setting it as the key message.

Here's my connector payload:

{
  "name" : "DATALAKE.BIGTABLE.SINK.QUEUEING.ZTXXD",
  "config" : {
    "connector.class" : "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
    "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "topics" : "APP-DATALAKE-QUEUEING-ZTXXD_DATALAKE-V1",
    "transforms" : "HoistField,AddKeys,ExtractKey",
    "gcp.bigtable.project.id" : "bigtable-project-id",
    "gcp.bigtable.instance.id" : "bigtable-instance-id",
    "gcp.bigtable.credentials.json" : "XXXXX",
    "transforms.ExtractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.HoistField.field" : "raw_data_cf",
    "transforms.ExtractKey.field" : "KEY1,ATT1",
    "transforms.HoistField.type" : "org.apache.kafka.connect.transforms.HoistField$Value",
    "transforms.AddKeys.type" : "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.AddKeys.fields" : "KEY1,ATT1",
    "row.key.definition" : "KEY1,ATT1",
    "table.name.format" : "raw_ZTXXD_DATALAKE",
    "consumer.override.group.id" : "svc-datalake-KAFKA_2_BIGTABLE",
    "confluent.topic.bootstrap.servers" : "xxxxxx:9092",
    "input.data.format" : "JSON",
    "confluent.topic" : "_dsp-confluent-license",
    "input.key.format" : "STRING",
    "key.converter.schemas.enable" : "false",
    "confluent.topic.security.protocol" : "SASL_SSL",
    "row.key.delimiter" : "/",
    "confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"XXXXX\" password=\"XXXXXX\";",
    "value.converter.schemas.enable" : "false",
    "auto.create.tables" : "true",
    "auto.create.column.families" : "true",
    "confluent.topic.sasl.mechanism" : "PLAIN"
  }
}

And here's my message produced to Kafka:

{
    "MANDT": "110",
    "KEY1": "1",
    "KEY2": null,
    "ATT1": "1M",
    "ATT2": "0000000000",
    "TABLE_NAME": "ZTXXD_DATALAKE",
    "IUUC_OPERATION": "I",
    "CREATETIMESTAMP": "2022-01-24T20:26:45.247Z"
}
  

In my transforms I'm doing three operations:

  1. HoistField is putting my payload inside a two-level structure (the connect docs for BigTable says that connect expects a two-level structure in order to be able to infer the family columns

  2. addKey is adding the columns that I consider key to the message key

  3. ExtractKey is removing the key from the fields added in the header, leaving only the values ​​themselves.

I've been reading the documentation for this connector for Bigtable and it's not clear to me if the connector works well with the JSON format. Could you let me know?


Solution

  • JSON should work, but...

    deserialized kafka key is not a struct

    This is because you have set the schemas.enable=false property on the value converter, such that when you do ValueToKey, it's not a Connect Struct type; the HoistField makes a Java Map instead.

    If you're not able to use the Schema Registry and switch the serialization format, then you'll need to try and find a way to get the REST Proxy to infer the schema of the JSON message before it produces the data (I don't think it can). Otherwise, your records need to include schema and payload fields, and you need to enable schemas on the converters. Explained here

    Another option - There may be a transform project around that sets the schema of the record, but it's not builtin.. (it's not part of SetSchemaMetadata)