I intend to process a dataset from EventHub stored in ADLA, in batches. It seems logical to me to process intervals, where my date is between my last execution datetime and the current execution datetime.
I thought about saving the execution timestamps in a table so I can keep track of it, and do the following:
DECLARE @my_file string = @"/data/raw/my-ns/my-eh/{date:yyyy}/{date:MM}/{date:dd}/{date:HH}/{date:mm}/{date:ss}/{*}.avro";
DECLARE @max_datetime DateTime = DateTime.Now;
@min_datetime =
SELECT (DateTime) MAX(execution_datetime) AS min_datetime
FROM my_adldb.dbo.watermark;
@my_json_bytes =
EXTRACT Body byte[],
date DateTime
FROM @my_file
USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
How do I properly add this interval to my EXTRACT
query? I tested it using a common WHERE clause with interval defined by hand and it worked, but when I attempt to use @min_datetime
it doesn't work, since its result is a rowset.
I thought about applying some filtering in a subsequent query, but I am afraid this means @my_json_bytes
will extract my whole dataset and filter it aftewards, resulting in a suboptimized query.
Thanks in advance.
You should be able to apply the filter as part of a later SELECT
. U-SQL can push up predicates in certain conditions but I haven't been able to test this yet. Try something like this:
@min_datetime =
SELECT (DateTime) MAX(execution_datetime) AS min_datetime
FROM my_adldb.dbo.watermark;
@my_json_bytes =
EXTRACT Body byte[],
date DateTime
FROM @my_file
USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
@working =
SELECT *
FROM @my_json_bytes AS j
CROSS JOIN
@min_datetime AS t
WHERE j.date > t.min_datetime;