Search code examples
apache-kafkaavroapache-kafka-connectconfluent-schema-registry

Kafka connect | Failed to deserialize data for topic | Error retrieving Avro key / value schema version for id | Subject not found error code: 40401


first of all thanks to @OneCricketeer for your support so far. I have tried so many configurations by now that I don't know what else I could try.

Using confluent connect-standalone worker.properties sink.properties to access an external stream.

Connection is working and i can see that a offset is loaded:

INFO [my_mysql_sink|task-0] [Consumer clientId=connector-consumer-my_mysql_sink-0, groupId=connect-my_mysql_sink] Setting offset for partition gamerboot.gamer.master.workouts.clubs.spieleranalyse-1 to the committed offset FetchPosition{ offset=2225 , offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka8.pro.someurl.net:9093 (id: 8 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)

But afterwards i receive an error when newly messages come in:

ERROR [my_mysql_sink|task-0] WorkerSinkTask{id=my_mysql_sink-0} Error converting message key in topic 'gamerboot.gamer.master.workouts.clubs.spieleranalyse' partition 1 at offset 2225 and timestamp 1641459346507: Failed to deserialize data for topic gamerboot.gamer.master.workouts.clubs.spieleranalyse to Avro:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 422

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401

I do not get this.

worker.properties:

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

sink.properties

#key.converter.enhanced.avro.schema.support=true
#key.converter=org.apache.kafka.connect.storage.StringConverter

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=https://schema-reg.pro.someurl.net

#value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://schema-reg.pro.someurl.net

#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

#pk.mode=record_key
#pk.fields=

As there is no pk set in mysql i wanted to record everything from stream.

As it says "Error retrieving Avro key schema version for id 422" I can see the following:

screenshot_subject_id

Do not wonder as it says JSON, this is just my ChromePlugin which interprets this as json. Same is found for value. I also tried every combination in sink.properties which is commented out there. I also be able to curl the latest schema for key and value (like):

curl -s https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest|jq

{
  "type": "record",
  "name": "ClubWorkoutKey",
  "namespace": "com.ad.gamerboot.kafka.models.workouts",
  "fields": [
    {
      "name": "playerId",
      "type": "string"
    },
    {
      "name": "tagId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

It went a little further when I entered String Converter for key.converter and value.converter in sink.properties . But what must be wrong in my opinion, because Avro is passed here. With String there were then other problems, I would have had to set a pk and switch on delete etc.

Thanks for support.

*EDIT:

So, to me was given:

topic = gamerboot.gamer.master.workouts.clubs.spieleranalyse

schema.url = https://schema-reg.pro.someurl.net

as well as: schema id url:

 https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.workouts-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/schema

and:

https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest

For me its like a puzzle, i started with kafka 20 days ago. From there i tried the urls around and found the ones i posted for subject:

For Key: https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest/

Schema: {"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey","version":1,"id":422,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKey\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}

For Values: https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/

and https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue/versions/latest/

Schemas: {"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue","version":1,"id":423,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKickValue\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ballSpeed\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ballSpeedFloat\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"ballSpeedZone\",\"type\":{\"type\":\"enum\",\"name\":\"BallSpeedZone\",\"symbols\":[\"COLD\",\"MEDIUM\",\"HOT\",\"FIRE\",\"INVALID\"]}},{\"name\":\"confidence\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}

and: {"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue","version":1,"id":424,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutPlayerMotionValue\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"absoluteDistance\",\"type\":\"float\"},{\"name\":\"averageSpeed\",\"type\":\"float\"},{\"name\":\"peakSpeed\",\"type\":\"float\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"installationId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"averageSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"AverageSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null,\"aliases\":[\"speedZone\"]},{\"name\":\"peakSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"PeakSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}

MySQL table:

+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| Field            | Type                                                                 | Null | Key | Default | Extra |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| playerid         | varchar(100)                                                         | YES  |     | NULL    |       |
| timestamp        | mediumtext                                                           | YES  |     | NULL    |       |
| absoluteDistance | float                                                                | YES  |     | NULL    |       |
| avarageSpeed     | float                                                                | YES  |     | NULL    |       |
| peakSpeed        | float                                                                | YES  |     | NULL    |       |
| tagId            | varchar(50)                                                          | YES  |     | NULL    |       |
| installationId   | varchar(100)                                                         | YES  |     | NULL    |       |
| averageSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES  |     | NULL    |       |
| peakSpeedZone    | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES  |     | NULL    |       |
| ballSpeed        | int(11)                                                              | YES  |     | NULL    |       |
| ballSpeedFloat   | float                                                                | YES  |     | NULL    |       |
| ballSpeedZone    | enum('COLD','MEDIUM','HOT','FIRE','INVALID')                         | YES  |     | NULL    |       |
| confidence       | int(11)                                                              | YES  |     | NULL    |       |
| ingestionTime    | mediumtext                                                           | YES  |     | NULL    |       |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+

Data expected in MySQL:

+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| playerid                             | timestamp     | absoluteDistance | avarageSpeed | peakSpeed | tagId          | installationId | averageSpeedZone | peakSpeedZone | ballSpeed | ballSpeedFloat | ballSpeedZone | confidence | ingestionTime |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641495873505 |          5.76953 |       1.1543 |   1.22363 | 0104FLHBN009XD | null           | WALK             | WALK          |      NULL |           NULL | NULL          |       NULL | 1641496586458 |
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641484677624 |             NULL |         NULL |      NULL | 0104FLHBN009XD | NULL           | NULL             | NULL          |        37 |        37.0897 | COLD          |         77 | 1641484896747 |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+

Data from avro-console looks like for the db entries:

{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641484677624,"tagId":{"string":"0104FLHBN009XD"},"ballSpeed":{"int":37},"ballSpeedFloat":{"float":37.08966},"ballSpeedZone":"COLD","confidence":{"int":77},"ingestionTime":{"long":1641484896747}}

{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641495873505,"absoluteDistance":5.7695312,"averageSpeed":1.1542969,"peakSpeed":1.2236328,"tagId":{"string":"0104FLHBN009XD"},"installationId":null,"averageSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.AverageSpeedZone":"WALK"},"peakSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.PeakSpeedZone":"WALK"},"ingestionTime":{"long":1641496586458}}

This is a fresh actual confluent installation. I updated just some hours ago Avro to: kafka-connect-avro-converter:7.0.1


Solution

  • Schemas was changed by company regarding RecordNameStrategy. Everything is working now.

    Thanks