Search code examples
azureazure-cloud-servicesazure-eventhubazure-iot-hub

When the ProcessEventsAsync(PartitionContext context, ienumerable<EventData> messages) method will be fired


I am currently working on Internet Of Things, in my current project I was Created the One Azure Cloud Service Project in that I Created the Worker Role, inside the worker role i have wrote below lines of code.

 public class WorkerRole : RoleEntryPoint
{
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

    private static string connectionString;
    private static string eventHubName;
    public static ServiceClient iotHubServiceClient { get; private set; }
    public static EventHubClient eventHubClient { get; private set; }

    public override void Run()
    {
        Trace.TraceInformation("EventsForwarding Run()...\n");

        try
        {
            this.RunAsync(this.cancellationTokenSource.Token).Wait();
        }
        finally
        {
            this.runCompleteEvent.Set();
        }
    }

    public override bool OnStart()
    {
        // Set the maximum number of concurrent connections
        ServicePointManager.DefaultConnectionLimit = 12;

        // For information on handling configuration changes
        // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

        bool result = base.OnStart();

        Trace.TraceInformation("EventsForwarding OnStart()...\n");

        connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
        eventHubName = ConfigurationManager.AppSettings["Microsoft.ServiceBus.EventHubName"];

        string storageAccountName = ConfigurationManager.AppSettings["AzureStorage.AccountName"];
        string storageAccountKey = ConfigurationManager.AppSettings["AzureStorage.Key"];
        string storageAccountString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
            storageAccountName, storageAccountKey);

        string iotHubConnectionString = ConfigurationManager.AppSettings["AzureIoTHub.ConnectionString"];
        iotHubServiceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);

        var defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup();

        string eventProcessorHostName = "SensorEventProcessor";
        EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, defaultConsumerGroup.GroupName, connectionString, storageAccountString);
        eventProcessorHost.RegisterEventProcessorAsync<SensorEventProcessor>().Wait();

        Trace.TraceInformation("Receiving events...\n");

        return result;
    }

    public override void OnStop()
    {
        Trace.TraceInformation("EventsForwarding is OnStop()...");

        this.cancellationTokenSource.Cancel();
        this.runCompleteEvent.WaitOne();

        base.OnStop();

        Trace.TraceInformation("EventsForwarding has stopped");
    }

    private async Task RunAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            //Trace.TraceInformation("EventsToCommmandsService running...\n");
            await Task.Delay(1000);

        }
    }
}

Next I have wrote the below lines of code in SensorEventProcessor, for receiving the messages from event hub and send those messages to IoT hub.

class SensorEventProcessor : IEventProcessor
{
    Stopwatch checkpointStopWatch;
    PartitionContext partitionContext;

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        Trace.TraceInformation(string.Format("EventProcessor Shuting Down.  Partition '{0}', Reason: '{1}'.", this.partitionContext.Lease.PartitionId, reason.ToString()));
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
        Trace.TraceInformation(string.Format("Initializing EventProcessor: Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
        this.partitionContext = context;
        this.checkpointStopWatch = new Stopwatch();
        this.checkpointStopWatch.Start();
        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        Trace.TraceInformation("\n");
        Trace.TraceInformation("........ProcessEventsAsync........");
        //string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}";
        //await WorkerRole.iotHubServiceClient.SendAsync("astranidevice", new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew)));
        foreach (EventData eventData in messages)
        {
            try
            {
                string jsonString = Encoding.UTF8.GetString(eventData.GetBytes());

                Trace.TraceInformation(string.Format("Message received at '{0}'. Partition: '{1}'",
                    eventData.EnqueuedTimeUtc.ToLocalTime(), this.partitionContext.Lease.PartitionId));

                Trace.TraceInformation(string.Format("-->Raw Data: '{0}'", jsonString));

                SimpleTemperatureAlertData newSensorEvent = this.DeserializeEventData(jsonString);

                Trace.TraceInformation(string.Format("-->Serialized Data: '{0}', '{1}', '{2}', '{3}', '{4}'",
                    newSensorEvent.Time, newSensorEvent.RoomTemp, newSensorEvent.RoomPressure, newSensorEvent.RoomAlt, newSensorEvent.DeviceId));

                // Issuing alarm to device.
                string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}";
                Trace.TraceInformation("Issuing alarm to device: '{0}', from sensor: '{1}'", newSensorEvent.DeviceId, newSensorEvent.RoomTemp);
                Trace.TraceInformation("New Command Parameter: '{0}'", commandParameterNew);
                await WorkerRole.iotHubServiceClient.SendAsync(newSensorEvent.DeviceId, new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew)));
            }
            catch (Exception ex)
            {
                Trace.TraceInformation("Error in ProssEventsAsync -- {0}\n", ex.Message);
            }
        }

        await context.CheckpointAsync();
    }
    private SimpleTemperatureAlertData DeserializeEventData(string eventDataString)
    {
        return JsonConvert.DeserializeObject<SimpleTemperatureAlertData>(eventDataString);
    }

}

When I was debug my code, the ProcessEventsAsync(PartitionContext context, IEnumerable messages) method will never call and just enter into OpenAsync() method then itstop the debugging.

Please tell me Where I did mistake in my project and tell me when the ProcessEventsAsync() method will call.

Regards,

Pradeep


Solution

  • IEventProcessor.ProcessEventsAsync is invoked when there are any unprocessed messages in the EventHub.

    An Event Hub contains multiple partitions. A partition is an ordered sequence of events. Within a partition, each event includes an offset. This offset is used by consumers (IEventProcessor) to show the location in the event sequence for a given partition. When an IEventProcessor connects (EventProcessorHost.RegisterEventProcessorAsync), it passes this offset to the Event Hub to specify the location at which to start reading. When there are unprocessed messages (events with higher offset), they are delivered to the IEventProcessor. Checkpointing is used to persist the offset of processed messages (PartitionContext.CheckpointAsync).

    You can find detailed information about the internals of EventHub: Azure Event Hubs overview

    Have you sent any messages to the EventHub (EventHubClient.SendAsync(EventData))?