Search code examples
serializationstreamanalyticsavroazure-stream-analytics

stream analytics 'Invalid Avro Format, drop invalid record.'


I am trying to serialize my C# classes to 'Avro' using 'Microsoft Avro Library' and sending it to event hub. However when I am trying to read the data thru stream analytics it gives this error at the logs 'Invalid Avro Format, drop invalid record'

More details.. using reflection method as shown in https://azure.microsoft.com/en-in/documentation/articles/hdinsight-dotnet-avro-serialization/ to serialize to avro format and sending it to event hub

//Create a new AvroSerializer instance and specify a custom serialization strategy AvroDataContractResolver
        //for serializing only properties attributed with DataContract/DateMember
        var avroSerializer = AvroSerializer.Create<SensorData>();

        //Create a memory stream buffer
        using (var buffer = new MemoryStream())
        {
            //Create a data set by using sample class and struct
            var expected = new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } };

            //Serialize the data to the specified stream
            avroSerializer.Serialize(buffer, expected);
            var bytes = buffer.ToArray();
            var data = new EventData(bytes) {PartitionKey = "deviceId"};
            // send to event hub client
            eventHubClient.Send(data);
        }

Events are published fine to event hubs. I have created a worker role that can consume these events and can able to deserialize them.

However when I set this event hub as input to my stream analytics and setting the event serialization format as 'avro' it gives below errors..

Message: Invalid Avro Format, drop invalid record.

Message: IncorrectSerializationFormat errors are occuring too rapidly. They are being suppressed temporarily

I guess I have to include Avro Schema as well. Can anyone please guide me a correct way to serialize a C# class to 'avro' so stream analytics could understand it?

Thanks for your time.


Solution

  • You will have to include the schema. Below is an example of how you can send events along with Schema. This uses an AvroContainer.

            var eventHubClient = EventHubClient.CreateFromConnectionString("ReplaceConnectionString","ReplaceEventHubPath");
            int numberOfEvents = 10;
            using (var memoryStream = new MemoryStream())
            using (var avroWriter = AvroContainer.CreateWriter<SensorData>(memoryStream, Codec.Null))
            using (var sqWriter = new SequentialWriter<SensorData>(avroWriter, numberOfEvents))
            {
                Enumerable.Range(0, numberOfEvents)
                    .Select(i => new SensorData() { Id = "DeviceId", Value = i })
                    .ToList()
                    .ForEach(data => sqWriter.Write(data));
                memoryStream.Seek(0, SeekOrigin.Begin);
                var eventData = new EventData(memoryStream.ToArray());
                eventHubClient.Send(eventData);
            }