As part of a security product I have high scale cloud service (azure worker role) that reads events from event hub, batches them to ~2000 and stores in blob storage. Each event has a MachineId (the machine that sent it). Events are coming from the event hub in random order and I store them in random order in blob storage. The throughput is up to 125K events/sec, each event is ~2K so we have up to 250MB/sec of traffic. We have ~1M machines...
Later, another cloud service downloads the blobs and runs some detection logic on the events. He groups the events by MachineId and tries to undestand something from the machine timeline
The problem is that today events from the same machine are populated to different blobs. If I could somehow group the events by their MachineId and make sure that some time window of a machine is populated to the same blob this would increase the detections I could do in the cloud.
We do write the event to another Map reduce system and there we are doing much complex detections, but those of course are having high latency. If I could group the events better in the cloud I could catch more in real time
I there any technology that might help me with that?
Thanks in advance
tl;dr: Introducing another EventHub - in between the original eventhub and the blob storage - which re-partitions data per MachineID - is the best way to go.
In general, have one INJESTING EVENTHUB - which is just an entry point to your monitoring system. Use EventHubClient.send(eventData_without_partitionKey)
approach to send to this INJESTING EVENTHUB
. This will allow you to send with very low latency and high availability - as it will send to which ever partition that is currently taking less load and is available..
-------------- ----------- ----------
| | ========= | | ==== | |
| INJESTING | RE-PARTITION > | INTERIM | BLOB \ | BLOB |
| EVENTHUB | ========= | EVENTHUB | PUMP / | |
| | | | ==== ----------
-------------- -----------
Most importantly, refrain from partitioning data directly on the Ingesting EventHub, for these factors:
EventHubs Partition
on a Container
. When you provide PartitionKey
on your EventData
- that PartitionKey
will be hash'ed to a specific partition. Now, the Send
operation latency will be tied to that single Partition
's availability - events like windows OS upgrade or our service upgrade etc., could impact them. Instead, if you stick to EventHubClient.send(without_PartitionKey)
- we will route the EventData
as soon as possible to the available partition - so, your ingestion pipeline is guaranteed to be Highly available
.
Use Interim EventHubs as a way to partition data. i.e., in the RE-PARTITION
module - you are simply replaying the original stream to INTERIM EVENTHUB
by swapping one property to EventData.PARTITION_KEY
- which was originally empty.
// pseudo-code RE-PARTITION EVENTS
foreach(eventData receivedFromIngestingEventHubs)
{
var newEventData = clone(eventData);
eventHubClient.send(newEventData, eventData.Properties.get("machineId"))
}
What this makes sure - is that - all EventData
's with a Specific MachineID
are available on 1 and 1 - EventHubs Partition
. You do not need to create 1M EventHubs Partitions. Each partition can hold infinite number of PartitionKey
s. You could use EventProcessorHost
to host this per partition logic or an Azure Stream analytics Job
.
Also, this is your chance to filter and produce an optimal stream - which is consumable by the down-stream processing pipeline.
In the BLOB PUMP module (your down-stream processing pipeline) - when you consume events from a specific INTERIM EVENTHUB's Partition
- you are now guaranteed to have all Events
from a specific machineid - on this partition. Aggregate the events per your required size - 2k
- based on the PartitionId (machineId) - you will not have all the events continuously - you will need to build an in-memory aggregation logic for this (using EventProcessorHost
or AzureStreamAnalytics Job
.