Search code examples
azure-data-explorerkql

Azure Data Explorer: how to update a table from a raw JSON event using Kusto and update policies?


I ingest raw telemetry data as JSON records into a single-column table called RawEvents, in the column called Event. This is what a record/event looks like:

{
  "clientId": "myclient1",
  "IotHubDeviceId": "myiothubdevice1",
  "deviceId": "mydevice1",
  "timestamp": "2022-04-12T10:29:00.123",
  "telemetry": [
    {
        "telemetryId: "total.power"
        "value": 123.456
    },
    {
        "telemetryId: "temperature"
        "value": 34.56
    },
    ...
  ]
}

The RawEvents table is created and set up like this:

.create table RawEvents (Event: dynamic)
.create table RawEvents ingestion json mapping 'MyRawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'

There is also the Telemetry table that will be used for queries and analysis. The Telemetry table has the strongly-typed columns that match raw data record structure from the RawEvents table. It gets created like this:

.create table Telemetry (ClientId:string, IotHubDeviceId:string, DeviceId:string, Timestamp:datetime, TelemetryId:string, Value: real)

In order to get Telemetry table updated with records whenever a new raw event gets ingested into RawEvents, I have tried to define a data transformation function and to use that function inside an update policy which would be attached to the Telemetry table.

To that end, I have used the following script to verify that my data transformation logic works as expected:

datatable (event:dynamic)
[
    dynamic(
        {
            "clientId": "myclient1",
            "IotHubDeviceId": "myiothubdevice1",
            "deviceId": "mydevice1", 
            "timestamp": "2022-04-12T10:29:00.123",
            "telemetry": [
                {
                    "telemetryId": "total.power",
                    "value": 123.456
                },
                {
                    "telemetryId": "temperature",
                    "value": 34.56
                }
            ]
        }
    )
]
| evaluate bag_unpack(event)
| mv-expand telemetry
| evaluate bag_unpack(telemetry)

Executing that script gives me the desired output which matches the Telemetry table structure:

clientId  deviceId  IotHubDeviceId  timestamp  telemetryId  value
myclient1  mydevice1  myiothubdevice1  2022-04-12T10:29:00.123Z  total.power  123.456
myclient1  mydevice1  myiothubdevice1  2022-04-12T10:29:00.123Z  temperature  34.56

Next, I have created a function called ExpandTelemetryEvent which contains that same data transformation logic applied to RawEvents.Event:

.create function ExpandTelemetryEvent() {
    RawEvents
        | evaluate bag_unpack(Event)
        | mv-expand telemetry
        | evaluate bag_unpack(telemetry)
}

And as a final step, I have tried to create an update policy for the Telemetry table which would use RawEvents as a source and ExpandTelemetryEvent() as the transformation function:

.alter table Telemetry policy update @'[{"Source": "RawEvents", "Query": "ExpandTelemetryEvent()", "IsEnabled": "True"}]'

This is where I got the error message saying

Error during execution of a policy operation: Caught exception while validating query for Update Policy: 'IsEnabled = 'True', Source = 'RawEvents', Query = 'ExpandTelemetryEvent()', IsTransactional = 'False', PropagateIngestionProperties = 'False''. Exception: Request is invalid and cannot be processed: Semantic error: SEM0100: 'mvexpand' operator: Failed to resolve scalar expression named 'telemetry'

I sort of understand why the policy cannot be applied. With the sample script, the data transformation worked because there was enough information to infer what the telemetry is, whereas in this case there is nothing in the RawEvents.Event which would provide the information about the structure of the raw events which will be stored in the Event column.

How can this be solved? Is this the right approach at all?


Solution

  • As the bag_unpack plugin documentation indicates:

    The plugin's output schema depends on the data values, making it as "unpredictable" as the data itself. Multiple executions of the plugin, using different data inputs, may produce different output schema.

    Use well-defined transformation instead

    RawEvent
    | project clientId = event.clientId, deviceId = event.deviceId, IotHubDeviceId = event.IotHubDeviceId, timestamp = event.timestamp, event.telemetry
    | mv-expand event_telemetry
    | extend telemetryId = event_telemetry.telemetryId, value = event_telemetry.value
    | project-away event_telemetry