I am using io.confluent.connect.json.JsonSchemaConverter
as my value converter to generate the payload on a topic with the schema in the Schema Registry.
This leads to an error message when I try to create a ksqldb stream. The log entry reads:
Invalid UTF-32 character 0x17a2249 (above 0x0010ffff) at char #1, byte #7)
Here are the first few hex values of the message:
Keep in mind that I have a bunch of messages, the screenshot and the error message are from two different offsets.
ksqldb seems to think this might be a BOM given that the error message mentions 0x0010fff
.
JSON (RFC4627) does not allow for a BOM and to me this just looks like obvious binary trash.
I think what the JsonSchemaConverter may be doing is prefacing the data with the schema version that was written into the Schema Registry, in this case 1
Is there a way to tell ksqldb to attempt to skip portions of a JSON message.
[\d[{"]
)?Or am I doing something fundamentally wrong that has nothing to do with what I am seeing on a binary level?
EDIT:
It seems worth mentioning that I wanted a schema. I indicate that to ksqldb by using VALUE_FORMAT='JSON_SR'
and that seems to work out and I am also using a JsonSchemaConverter so the schema is written.
This isn't a JSON record. It's a Schema Registry Message including a JSON Schema along with a value. Magic Byte (0x0) + 4 byte int (Schema Id 1), then the remaining data.
You would need to have value.converter.schemas.enabled=false
on your Connect properties to remove it.
Otherwise, if you want to keep a schema, AvroConverter would be better in terms of compatibility. Then you would automatically have a schema in KSQL rather than need to parse out individual fields.
Alternatively, I think FORMAT="JSON"
is not what you want in KSQL because this assumes a record without a schema.
See note here : https://docs.ksqldb.io/en/latest/developer-guide/serialization/#json