Search code examples
apache-kafkagoogle-bigqueryapache-kafka-connectconfluent-platformconfluent-cloud

Confluent Cloud -> BigQuery - How to diagnose "Bad records" cause


I could push the data from MSSql Server to Topics on Confluent Cloud,but not from topics to BigQuery, it throws an error "Bad records in the last hour - 65"

I could able to connect the topics to bigQuery but not able to ingest the data.

MSSQL and BigQuery table format are the same first(string) last(string) raj ram

Do I need to add any other columns to ingest data such as timestamp, offset,etc.?


Solution

  • If there are messages that can't be sent to the target they'll get written to a Dead Letter Queue with details of the problem.

    From the Connectors screen you can see the ID of your connector

    enter image description here

    Use that id to locate a topic with the same name and a dlq- prefix.

    enter image description here

    You can then browse the topic and use the header information to determine the cause of the problem

    enter image description here

    If you prefer you can use kafkacat to view the headers:

    $ docker run --rm edenhill/kafkacat:1.5.0 \
             -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
             -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
             -b ${CCLOUD_BROKER_HOST} \
             -X sasl.username="${CCLOUD_API_KEY}" \
             -X sasl.password="${CCLOUD_API_SECRET}" \
             -t dlq-lcc-emj3x \
             -C -c1 -o beginning \
             -f 'Topic %t[%p], offset: %o, Headers: %h'
    
    Topic dlq-lcc-emj3x[0], offset: 12006, Headers: __connect.errors.topic=mysql-01-asgard.demo.transactions,__connect.errors.partition=5,__connect.errors.offset=90,__connect.errors.connector.name=lcc-emj3x,__connect.errors.task.id=0,__connect.errors.stage=VALUE_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
            at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
     at [Source: (byte[])"
    

    From there on in it's just a case of understanding the error. A lot of the time it's down to serialisation issues, which you can learn more about here.