Search code examples
azureazure-eventhubazure-stream-analytics

In Azure Eventhub how to send incoming data to a sql database


I have some data being collected that is in an xml format. Something that looks like

<OLDI_MODULE xmlns="">
 <StStoHMI_IBE>
<PRack>0</PRack>
<PRackSlotNo>0</PRackSlotNo>
<RChNo>0</RChNo>
<RChSlotNo>0</RChSlotNo>

This data is sent to Azure Eventhub. I wanted to send this data to a SQL database. I created a stream in Azure Stream Analytics that takes this input and puts it in a SQL database. But when the input format is asked for the input stream, there are only JSON,CVS and Avro. Which of these formats can I use? Or which of the azure services should I use to move data from Eventhub to sql database?


Solution

  • By far the easiest option is to use Azure Stream Analytics as you intended to do. But yes, you will have to convert the xml to json or another supported format before you can use the data.

    The other options is more complex, requires some code and a way to host the code (using a worker role or web job for instance) but gives the most flexibility. That option is to use an EventProcessor to read the data from the Event Hub and put it in a database.

    See https://azure.microsoft.com/en-us/documentation/articles/event-hubs-csharp-ephcs-getstarted/ for how to set this up.

    The main work is done in the Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable messages) method. Based on the example it will be something like:

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData eventData in messages)
        {
            string xmlData = Encoding.UTF8.GetString(eventData.GetBytes());
    
            // Parse the xml and store the data in db using Ado.Net or whatever you're comfortable with
        }
    
        //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
        if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
        {
            await context.CheckpointAsync();
            this.checkpointStopWatch.Restart();
        }
    }