I am trying to serialize my C# classes to 'Avro' using 'Microsoft Avro Library' and sending it to event hub. However when I am trying to read the data thru stream analytics it gives this error at the logs 'Invalid Avro Format, drop invalid record'
More details.. using reflection method as shown in https://azure.microsoft.com/en-in/documentation/articles/hdinsight-dotnet-avro-serialization/ to serialize to avro format and sending it to event hub
//Create a new AvroSerializer instance and specify a custom serialization strategy AvroDataContractResolver
//for serializing only properties attributed with DataContract/DateMember
var avroSerializer = AvroSerializer.Create<SensorData>();
//Create a memory stream buffer
using (var buffer = new MemoryStream())
{
//Create a data set by using sample class and struct
var expected = new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } };
//Serialize the data to the specified stream
avroSerializer.Serialize(buffer, expected);
var bytes = buffer.ToArray();
var data = new EventData(bytes) {PartitionKey = "deviceId"};
// send to event hub client
eventHubClient.Send(data);
}
Events are published fine to event hubs. I have created a worker role that can consume these events and can able to deserialize them.
However when I set this event hub as input to my stream analytics and setting the event serialization format as 'avro' it gives below errors..
Message: Invalid Avro Format, drop invalid record.
Message: IncorrectSerializationFormat errors are occuring too rapidly. They are being suppressed temporarily
I guess I have to include Avro Schema as well. Can anyone please guide me a correct way to serialize a C# class to 'avro' so stream analytics could understand it?
Thanks for your time.
You will have to include the schema. Below is an example of how you can send events along with Schema. This uses an AvroContainer.
var eventHubClient = EventHubClient.CreateFromConnectionString("ReplaceConnectionString","ReplaceEventHubPath");
int numberOfEvents = 10;
using (var memoryStream = new MemoryStream())
using (var avroWriter = AvroContainer.CreateWriter<SensorData>(memoryStream, Codec.Null))
using (var sqWriter = new SequentialWriter<SensorData>(avroWriter, numberOfEvents))
{
Enumerable.Range(0, numberOfEvents)
.Select(i => new SensorData() { Id = "DeviceId", Value = i })
.ToList()
.ForEach(data => sqWriter.Write(data));
memoryStream.Seek(0, SeekOrigin.Begin);
var eventData = new EventData(memoryStream.ToArray());
eventHubClient.Send(eventData);
}