Search code examples
apache-kafkaconfluent-platformksqldb

Ignoring Byte Order Mark or binary trash in front of JSON when using KSQLDB


I am using io.confluent.connect.json.JsonSchemaConverter as my value converter to generate the payload on a topic with the schema in the Schema Registry.

This leads to an error message when I try to create a ksqldb stream. The log entry reads:

Invalid UTF-32 character 0x17a2249 (above 0x0010ffff) at char #1, byte #7)

Here are the first few hex values of the message:

enter image description here

Keep in mind that I have a bunch of messages, the screenshot and the error message are from two different offsets.

ksqldb seems to think this might be a BOM given that the error message mentions 0x0010fff. JSON (RFC4627) does not allow for a BOM and to me this just looks like obvious binary trash. I think what the JsonSchemaConverter may be doing is prefacing the data with the schema version that was written into the Schema Registry, in this case 1

Is there a way to tell ksqldb to attempt to skip portions of a JSON message.

  • until the structure starts to appear valid (first occurrence of pattern match [\d[{"])?
  • straight up skip 5 bytes?

Or am I doing something fundamentally wrong that has nothing to do with what I am seeing on a binary level?

EDIT: It seems worth mentioning that I wanted a schema. I indicate that to ksqldb by using VALUE_FORMAT='JSON_SR' and that seems to work out and I am also using a JsonSchemaConverter so the schema is written.


Solution

  • This isn't a JSON record. It's a Schema Registry Message including a JSON Schema along with a value. Magic Byte (0x0) + 4 byte int (Schema Id 1), then the remaining data.

    You would need to have value.converter.schemas.enabled=false on your Connect properties to remove it.

    Otherwise, if you want to keep a schema, AvroConverter would be better in terms of compatibility. Then you would automatically have a schema in KSQL rather than need to parse out individual fields.

    Alternatively, I think FORMAT="JSON" is not what you want in KSQL because this assumes a record without a schema.

    See note here : https://docs.ksqldb.io/en/latest/developer-guide/serialization/#json