Search code examples
azure-data-lakeu-sql

Defining an EXTRACT range from a SELECT statement


I intend to process a dataset from EventHub stored in ADLA, in batches. It seems logical to me to process intervals, where my date is between my last execution datetime and the current execution datetime.

I thought about saving the execution timestamps in a table so I can keep track of it, and do the following:

DECLARE @my_file string = @"/data/raw/my-ns/my-eh/{date:yyyy}/{date:MM}/{date:dd}/{date:HH}/{date:mm}/{date:ss}/{*}.avro";

DECLARE @max_datetime DateTime =  DateTime.Now;

@min_datetime =
    SELECT (DateTime) MAX(execution_datetime) AS min_datetime
    FROM my_adldb.dbo.watermark;

@my_json_bytes =
    EXTRACT Body byte[],
            date DateTime
    FROM @my_file
    USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");

How do I properly add this interval to my EXTRACT query? I tested it using a common WHERE clause with interval defined by hand and it worked, but when I attempt to use @min_datetime it doesn't work, since its result is a rowset.

I thought about applying some filtering in a subsequent query, but I am afraid this means @my_json_bytes will extract my whole dataset and filter it aftewards, resulting in a suboptimized query.

Thanks in advance.


Solution

  • You should be able to apply the filter as part of a later SELECT. U-SQL can push up predicates in certain conditions but I haven't been able to test this yet. Try something like this:

    @min_datetime =
        SELECT (DateTime) MAX(execution_datetime) AS min_datetime
        FROM my_adldb.dbo.watermark;
    
    @my_json_bytes =
        EXTRACT Body byte[],
                date DateTime
        FROM @my_file
        USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
    
    @working =
        SELECT *
        FROM @my_json_bytes AS j
             CROSS JOIN
                 @min_datetime AS t
        WHERE j.date > t.min_datetime;