Search code examples
c#apache-kafkaavroconfluent-platform

C# confluent kafka problem with avro serialization


I'm using docker to run kafka and other services from https://github.com/confluentinc/cp-all-in-one with confluent nuget packages for kafka, avro and schemaRegistry in my test project.

If it goes to sending json messages I have no problem till now, but I'm struggling with sending avro serialized messages.

I saw https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific example and I tried to do it the same way but eventually I get an exception like below:

Local: Value serialization error
at Confluent.Kafka.Producer2.<ProduceAsync>d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter1.GetResult() at Kafka_producer.KafkaService.d__10.MoveNext() in C:\Users\lu95eb\source\repos\Kafka_playground\Kafka producer\KafkaService.cs:line 126

with inner exception

Object reference not set to an instance of an object.
at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer1.d__6.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Confluent.Kafka.Producer`2.d__52.MoveNext()

Here's my SpecificRecord class

public class UserInfo : ISpecificRecord
{
    public string Name { get; set; }
    public int[] Numbers { get; set; }

    public Schema Schema => Schema.Parse(@"
        {
          ""name"": ""UserInfo"",
          ""type"": ""record"",
          ""namespace"": ""kafka"",
          ""fields"": [
            {
              ""name"": ""Name"",
              ""type"": ""string""
            },
            {
              ""name"": ""Numbers"",
              ""type"": {
                ""type"": ""array"",
                ""items"": ""int""
              }
            }
          ]
        }
        ");

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return Name;
            case 1: return Numbers;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
        }
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: Name = (string)fieldValue; break;
            case 1: Numbers = (int[])fieldValue; break;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
        }
    }
}

And method used to send message

private async Task SendSpecificRecord(UserInfo userInfo)
    {
        using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
        using (var producer =
            new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                .SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
                .Build())
        {

            var message = new Message<string, UserInfo>
            {
                Key = userInfo.Name,
                Value = userInfo
            };


            await producer.ProduceAsync(SpecificTopic, message);
        }
    }

KafkaService.cs:line 126 is await producer.ProduceAsync(SpecificTopic, message);

Like I wrote at the start, I have no problems with schemaRegistry- I have schemas registered and they work properly for json, I have no problems with topics, broker, consumer or whatever.

I will be grateful if anyone can point me what I'm doing wrong. Thank you in advance.


Solution

  • If anybody is curious about the solution (I can't imagine how someone could be ;)) then I wrote 'custom' avro serializer and deserializer and works like a charm.

    public class CustomAvroSerializer<T> : IAsyncSerializer<T>
        where T : class, ISpecificRecord
    {
        public Task<byte[]> SerializeAsync(T data, SerializationContext context)
        {
            return Task.Run(() =>
            {
                using (var ms = new MemoryStream())
                {
                    var enc = new BinaryEncoder(ms);
                    var writer = new SpecificDefaultWriter(data.Schema);
                    writer.Write(data, enc);
                    return ms.ToArray();
                }
            });
        }
    }
    
    public class CustomAvroDeserializer<T> : IDeserializer<T>
        where T : class, ISpecificRecord
    {
        public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
        {
            using (var ms = new MemoryStream(data.ToArray()))
            {
                var dec = new BinaryDecoder(ms);
                var regenObj = (T)Activator.CreateInstance(typeof(T));
    
                var reader = new SpecificDefaultReader(regenObj.Schema, regenObj.Schema);
                reader.Read(regenObj, dec);
                return regenObj;
            }
        }
    }