Search code examples
.net-coredeserializationazure-functionsazure-eventhubazure-stream-analytics

Azure Function (v2, .NET Core): Deserialize Event Hub input coming from Stream Analytics Job


I have an Azure Stream Analytics job that outputs events as "Line separated" and encoded in UTF8.

I created an Azure Function (v2, .NET Core) that gets triggered by events on the Event Hub. In this context I process the events in batches:

public static async Task Run(
    [EventHubTrigger(
        "xxx",
        Connection = "xxx",
        ConsumerGroup = "xxx")
    ] EventData[] events, 
    ILogger log)

My goal is to deserialize the input into a POCO class based object. What I currently always get is an error of "Unexpected character...".

What is the correct way of deserialization in that context?


Solution

  • Finally the following approach worked:

    private static List<a> DeserializeInput(EventData[] events, ILogger log)
    { 
        try
        {
            List<a> inputsDeserializedAsList = new List<a>();
    
            foreach (EventData eventData in events)
            {
                // Deserialize Input
                string payload = Encoding.UTF8.GetString(eventData.Body);
                log.LogInformation($"DeserializeInput: payload retrieved: {payload}...");
    
                DateTime enqueuedTimeUtc = (DateTime)eventData.SystemProperties.EnqueuedTimeUtc;
    
                using (StringReader reader = new StringReader(payload))
                {
                    List<a> inputEventsAsList = new List<a>();
    
                    string line;
                    while ((line = reader.ReadLine()) != null)
                    {
                        a parsedData = JsonConvert.DeserializeObject<a>(line);
                        parsedData.EventEnqueuedUtcTime = enqueuedTimeUtc;
                        inputEventsAsList.Add(parsedData);
                    }
                    inputsDeserializedAsList.AddRange(inputEventsAsList);
                }
            }
    
            return inputsDeserializedAsList;
        }
        catch (Exception e) {
            throw new Exception("DeserializeInput: failed deserializing input...", e);
        }
    }