Search code examples
azureazure-eventhubazure-blob-storagedata-partitioning

Best Practices: to partition eventhub data & achieve high-scale, low-latency and high-throughput via azure eventhubs to external store (azure blobs)


As part of a security product I have high scale cloud service (azure worker role) that reads events from event hub, batches them to ~2000 and stores in blob storage. Each event has a MachineId (the machine that sent it). Events are coming from the event hub in random order and I store them in random order in blob storage. The throughput is up to 125K events/sec, each event is ~2K so we have up to 250MB/sec of traffic. We have ~1M machines...

Later, another cloud service downloads the blobs and runs some detection logic on the events. He groups the events by MachineId and tries to undestand something from the machine timeline

The problem is that today events from the same machine are populated to different blobs. If I could somehow group the events by their MachineId and make sure that some time window of a machine is populated to the same blob this would increase the detections I could do in the cloud.

We do write the event to another Map reduce system and there we are doing much complex detections, but those of course are having high latency. If I could group the events better in the cloud I could catch more in real time

I there any technology that might help me with that?

Thanks in advance


Solution

  • tl;dr: Introducing another EventHub - in between the original eventhub and the blob storage - which re-partitions data per MachineID - is the best way to go.

    In general, have one INJESTING EVENTHUB - which is just an entry point to your monitoring system. Use EventHubClient.send(eventData_without_partitionKey) approach to send to this INJESTING EVENTHUB. This will allow you to send with very low latency and high availability - as it will send to which ever partition that is currently taking less load and is available..

     --------------                     -----------                 ----------
    |              |    =========      |           |    ====       |          |
    |  INJESTING   |    RE-PARTITION > |  INTERIM  |    BLOB \     |   BLOB   |
    |  EVENTHUB    |    =========      |  EVENTHUB |    PUMP /     |          |
    |              |                   |           |    ====        ----------
     --------------                     -----------
    

    Most importantly, refrain from partitioning data directly on the Ingesting EventHub, for these factors:

    1. Highly available ingestion pipeline - Not associating events to a partition - will keep your ingestion pipeline highly available. Behind the scenes, we host each of your EventHubs Partition on a Container. When you provide PartitionKey on your EventData - that PartitionKey will be hash'ed to a specific partition. Now, the Send operation latency will be tied to that single Partition's availability - events like windows OS upgrade or our service upgrade etc., could impact them. Instead, if you stick to EventHubClient.send(without_PartitionKey) - we will route the EventData as soon as possible to the available partition - so, your ingestion pipeline is guaranteed to be Highly available.
    2. flexible data design - in distributed systems quite often you will soon need to re-partition data based on a different key. Be sure to - measure the probability in your system for this :).

    Use Interim EventHubs as a way to partition data. i.e., in the RE-PARTITION module - you are simply replaying the original stream to INTERIM EVENTHUB by swapping one property to EventData.PARTITION_KEY - which was originally empty.

    // pseudo-code RE-PARTITION EVENTS
    foreach(eventData receivedFromIngestingEventHubs)
    {
        var newEventData = clone(eventData);
        eventHubClient.send(newEventData, eventData.Properties.get("machineId"))
    }
    

    What this makes sure - is that - all EventData's with a Specific MachineID are available on 1 and 1 - EventHubs Partition. You do not need to create 1M EventHubs Partitions. Each partition can hold infinite number of PartitionKeys. You could use EventProcessorHost to host this per partition logic or an Azure Stream analytics Job.

    Also, this is your chance to filter and produce an optimal stream - which is consumable by the down-stream processing pipeline.

    In the BLOB PUMP module (your down-stream processing pipeline) - when you consume events from a specific INTERIM EVENTHUB's Partition - you are now guaranteed to have all Events from a specific machineid - on this partition. Aggregate the events per your required size - 2k - based on the PartitionId (machineId) - you will not have all the events continuously - you will need to build an in-memory aggregation logic for this (using EventProcessorHost or AzureStreamAnalytics Job.