Search code examples
c#apache-kafkakafka-consumer-apikafka-producer-api

Kafka Consumer messages are not receiving after changing schema registry format


I am new to Kafka and somehow able to run kafka Avro consumer and Producer. Producer is producing the message and i am successfully getting it in consumer. Here is my producer code snippet:

static async void AvroProducer()
{
    string bootstrapServers = "localhost:9092";
    string schemaRegistryUrl = "Production163:8081"; 
    string topicName = "player";
    string groupName = "avro-generic-example-group";


     var s = (RecordSchema)RecordSchema.Parse(
        @"{
            ""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
            ""type"": ""record"",
            ""name"": ""User"",
            ""fields"": [
                {""name"": ""name"", ""type"": ""string""},
                {""name"": ""favorite_number"",  ""type"": [""int"", ""null""]},
                {""name"": ""favorite_color"", ""type"": [""string"", ""null""]}
            ]
          }"
    );

    using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
    using (var producer =
        new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
            .SetKeySerializer(new AsyncAvroSerializer<string>(schemaRegistry))
            .SetValueSerializer(new AsyncAvroSerializer<GenericRecord>(schemaRegistry))
            .Build())
    {
        Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

        int i = 0;
        string text;
        while ((text = Console.ReadLine()) != "q")
        {
            var record = new GenericRecord(s);
            record.Add("name", text);
            record.Add("favorite_number", i++);
            record.Add("favorite_color", "blue");

            producer.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                .ContinueWith(task => task.IsFaulted
                    ? $"error producing message: {task.Exception.Message}"
                    : $"produced to: {task.Result.TopicPartitionOffset}");
        }
    }
    Console.ReadLine();

}

As you can see in above code i am using a record scheme, But I am trying this scehema:

//this is the new schema try
        var s = (RecordSchema)RecordSchema.Parse(
            @"{
                ""type"": ""record"",
                ""name"": ""TestingMsg"",
                ""doc"": ""Sample"",
                ""fields"": [
                  {
                   ""name"": ""key"",
                   ""type"": ""string""
                  },
                  {
                   ""name"": ""Time"",
                   ""type"": ""long""
                  },
                  {
                   ""name"": ""sourceSeconds"",
                   ""type"": ""long""
                  },
                  {
                   ""name"": ""serverT"",
                   ""type"": ""long""
                  },

                  {
                   ""name"": ""statusCode"",
                   ""type"": ""int""
                  }
                ]
                }"
            );

The new one which I am trying to use, but it not working as I am not getting messages in consumer. And here is the consumer :

void KafkaReader(CancellationToken cancellationToken)
    {
        Debug.Log("kafka reader started. . .");
        // Set up your Kafka connection here.

        while (_keepThreadRunning)
        {
            using (CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
            using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
            //using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers})
                    .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                    .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                    .SetErrorHandler((_, e) => Debug.Log($"Error: {e.Reason}"))
                    .Build())
            {
                Debug.Log("subscribe" );
                consumer.Subscribe(topicName);


                while (true)
                {
                    ConsumeResult<string, GenericRecord> consumeResult = consumer.Consume(cancellationToken);//TimeSpan.FromMilliseconds(50000)//new TimeSpan(0,0,1)

                    _stringsReceived.Enqueue(consumeResult.Value.ToString());


                    if (consumeResult != null)
                    {
                        Debug.Log($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");


                    }
                    else
                    {
                        Debug.Log("consumer Result is null");
                    }

                    //yield return new WaitForSeconds(1);
                }
            }


        }

        GetComponent<KafkaServerConfigUI>().KafkaDisconnected();

        // Disconnect and clean up your connection here.


    }

Remember I just running the default apache Kafka registry using a batch file.

D:\ApachKafka\confluent\confluent-5.2.1\bin\windows\schema-registry-start.bat D:\ApachKafka\confluent\confluent-5.2.1\etc\schema-registry\schema-registry.properties

What i am doing wrong? Do i need to register the schema to anywhere?


Solution

  • For making any change or using new schema, you have to register the schema. I was missing this thing therefore I was not getting the messages in consumer. Here is the short python script that help you to register the schema.

    Using the script, You have to provide the URL of the Schema Registry (starting with http://, not just a hostname and port), the topic for which the schema should be registered, and the path to the schema.

    Here is the way i registrer my schema

    enter image description here

    Thank to Ref: Avro and Schema registry