Search code examples
goapache-kafkajsonschemaconfluent-schema-registry

Kafka schema registry - Broker: Broker failed to validate record


I'm verifying the schema using the Kafka schema registry. The problem is that even though I put in the correct schema, I still get the error Broker: Broker failed to verify record.

confluent.value.schema.validation is set to true so that value's schema can be examined at the current broker level.

The schema i set and the data i send are as follows.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "description": "Sample schema to help you get started.",
  "properties": {
    "test": {
      "type": "string"
    }
  },
  "title": "schema_test",
  "type": "object"
}
{"test": "test1"}

Additionally, I am sending data using go and the code for the data is as follows.

// main

func main() {
    kafka.ProduceData("schema_test", []byte(`{"test": "test1"}`))
}
// kafka
func ProduceData(topic string, data []byte) {

    conf := ReadConfig()

    p, err := kafka.NewProducer(&conf)

    if err != nil {
        fmt.Printf("Failed to create producer: %s", err)
        os.Exit(1)
    }

    defer p.Close()

    // Go-routine to handle message delivery reports and
    // possibly other event types (errors, stats, etc)
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
                        *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
                }
            }
        }
    }()

    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          data,
    }, nil)

    // Wait for all messages to be delivered
    p.Flush(15 * 1000)
}

Solution

  • There seems to be a misunderstanding of how data is actually verified by the broker. It's working as expected. You need a Schema ID. You're just sending plain JSON on the topic with no ID. The schema on the registry doesn't really matter, only its ID does.

    From docs

    enables the broker to verify that data produced to a Kafka topic is using a valid schema ID in Schema Registry

    More specifically, the schema you've added to the registry is just one of many "versions" that could exist on a subject (such as topic-value). Each version has a unique ID. The validation doesn't just use the latest version; that ID is encoded client side.

    See Confluent example of producing with a JSON Schema (which itself should do record validation).

    https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/json_producer_example/json_producer_example.go

    The broker side validation is just to prevent incorrectly serialized data, or "poison pills", as you're effectively doing now.