Search code examples
pythonfor-loopspark-streamingazure-databricksdatabricks-autoloader

Databricks Autoloader writing stream


I have multiple tables (csv files per table) loaded in azure datalake and would like to use autoloader to load everytable in Databricks Delta table.

I have a python code where I use the for loop to create the schema per table, create the df and then writeStream the df.

I also have the function update_insert, where I do some data manipulation and also included the merge function to update insert the delta tables.

This is my function code:

def update_insert(df, epochId, cdm):
    # clean only  100% identical rows'
    print("-------------------   " + cdm)
    df = df.dropDuplicates()
    w = Window.partitionBy("Id").orderBy(F.col("modifiedon").desc())

    df = df.withWatermark("modifiedon", "1 day").withColumn("rn", F.row_number().over(w)).where(F.col("rn") == 1).drop('rn')
    # final =df.join(agg, on=["id", "modifiedon"], how="right")
    dfUpdates = df.withColumnRenamed("id","BK_id")

    p = re.compile('^BK_')
    list_of_columns = dfUpdates.columns
    list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]

    string = ''
    for column in list_of_BK_columns:
        string += f'table.{column} = newData.{column} and '

    string_insert = ''
    for column in list_of_BK_columns:
        string_insert += f'table.{column} = newData.{column} and '
    string_insert[:-4]

    dictionary = {}

    for key in list_of_columns:
        dictionary[key] = f'newData.{key}'
    
    print("printing " + cdm + " columns")
    print(dfUpdates.columns)
    deltaTable = DeltaTable.forPath(spark,f"abfss://[email protected]/D365/{cdm}"+"_autoloader_nodups")
    deltaTable.alias('table') \
  .merge(dfUpdates.alias("newData"),
    string
  ) \
  .whenMatchedUpdate(set =
    dictionary
  ) \
  .whenNotMatchedInsert(values =
    dictionary
  ) \
  .execute()

Above function is used below in the autoloader's foreachBatch:

for entity in manifest.collect()[0]['entities']:
    cdm = entity.asDict()['name']
    print(cdm)
    schema = StructType()
    length = len(entity.asDict()['attributes']) - 1
    for index1, attribute in enumerate(entity.asDict()['attributes']):
        if (attribute.asDict()['dataType'] in ('int32', 'time')) and (index1 != length):
            field = StructField(attribute.asDict()['name'],IntegerType(),True)
            schema.add(field)
        elif attribute.asDict()['dataType'] in ('dateTime') and (index1 != length):
            field = StructField(attribute.asDict()['name'],TimestampType(),True)
            schema.add(field)
        elif attribute.asDict()['dataType'] in ('string') and (index1 != length):
            field = StructField(attribute.asDict()['name'],StringType(),True)
            schema.add(field)
        elif attribute.asDict()['dataType'] in ('int64') and (index1 != length):
            field = StructField(attribute.asDict()['name'],LongType(),True)
            schema.add(field)
        elif attribute.asDict()['dataType'] in ('decimal') and (index1 != length):
            field = StructField(attribute.asDict()['name'],DecimalType(38, 20),True)
            schema.add(field)
        elif index1 == length:
            field = StructField(attribute.asDict()['name'],StringType(),True)
            schema.add(field)
            LastColumnName = attribute.asDict()['name']
            LastColumnDataType = attribute.asDict()['dataType']
        else:
            field = StructField(attribute.asDict()['name'],StringType(),True)
            schema.add(field)

    # Define variables
    checkpoint_directory = f"abfss://[email protected]/D365/checkpoints/{cdm}"
    data_source = f"abfss://[email protected]/*/{cdm}/*.csv"
    source_format = "csv"
    # Configure Auto Loader to ingest csv data to a Delta table
    print("schema for " + cdm)
    # print(schema)
    df = (
        spark.readStream
        .option("delimiter", ",")
        .option("quote", '"')
        .option("mode", "permissive")
        .option("lineSep", "\r\n")
        .option("multiLine", "true")
        .format("cloudFiles")
        .option("cloudFiles.format", source_format)
        # .option("cloudFiles.schemaLocation", checkpoint_directory)
        .option("cloudFiles.inferColumnTypes","true")
        .option("header", "false")
        .option("escape", '"')
        .schema(schema)
        .load(data_source)
    )
    print("writing " + cdm)
    # print(df.columns)
    df.writeStream.format("delta").foreachBatch(lambda df, epochId: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()

The problem is that for each loop is not working as it is supposed to work. I have added the print statments to the code to see which df are created for which tables.

For example:

  1. it starts with prinitng the print(cdm) (cdm is the name of the table) and output is msdyn_workorder
  2. then is should print("schema for " + cdm) and output is schema for msdyn_workorder
  3. next print is print("writing " + cdm and out put is writing msdyn_workorder

This is where it goes wrong as the next print should give the outpur of the print which is inside the function print("------------------- " + cdm). Instead what it does is printing the next table name print(cdm) which is nrq_customerassetproperty, so starting for loop again (i have only two tables in so for loop should run twice).

Then it continues same sequence of printing statements

  1. print("schema for " + cdm) and output is schema for nrq_customerassetproperty
  2. next print is print("writing " + cdm and out put is writing nrq_customerassetproperty

And here it started to print things which are in the def like : print("------------------- " + cdm), print("schema for " + cdm) has the out printing nrq_customerassetproperty columns.

With the next print it gets interested that when I ask to print(dfUpdates.columns) which should be df I read in the for each loop. It prints the columns of the previous df. in this case columns of the msdyn_workorder.

I dont know where it goes wrong. Is it that streaming data has some problems with for loops?

Screenshot of print statements. note that its printing printing nrq_customerassetproperty columns but the columns does correspond to msdyn_workorder table.

enter image description here


Solution

  • Pass cdm in foreachBatch function like below.

    lambda df, epochId, cdm=cdm: update_insert(df, epochId, cdm)
    

    Because when you pass without specifying the cdm in lambda it will take the cdm value from the outer scope, which is the value it had at the time of lambda creation.

    Below is the update function used.

    def update_insert(df, epochId,cdm):
        print(epochId)
        df.show()
        print("------------------- " + cdm)
        print("printing " + cdm + " columns")
        print(df.columns)
    

    And I ran your writeStream code.

    enter image description here

    Output:

    enter image description here

    It seems like your print statements of next loop getting initially because the foreachBatch function runs asynchronously and repeatedly for each batch of streaming data. Whatever the print statements outside foreachBatch are printed in driver program and prints there.

    Below is output.

    msdyn_workorder
    schema for msdyn_workorder
    writing msdyn_workorder
    2023-09-05T09:03:10.400+0000: [GC (Allocation Failure) [PSYoungGen: 1857533K->64491K(1965056K)] 2088300K->295274K(6238720K), 0.0468967 secs] [Times: user=0.09 sys=0.02, real=0.05 secs] 
    Next Df
    nrq_customerassetproperty
    schema for nrq_customerassetproperty
    writing nrq_customerassetproperty
    Next Df
    2023-09-05 09:03:16,220 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'publicFile.rolling': The bufferSize is set to 8192 but bufferedIO is not true
    2023-09-05 09:03:16,222 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'privateFile.rolling': The bufferSize is set to 8192 but bufferedIO is not true
    2023-09-05 09:03:16,223 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.UsageLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
    2023-09-05 09:03:16,224 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.ProductLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
    2023-09-05 09:03:16,225 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.LineageLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
    2023-09-05 09:03:16,226 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.MetricsLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
    2023-09-05 09:03:16,227 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'dltExecution.rolling': The bufferSize is set to 8192 but bufferedIO is not true
    2023-09-05T09:03:16.237+0000: [GC (Metadata GC Threshold) [PSYoungGen: 782632K->62922K(1994240K)] 1013415K->293722K(6267904K), 0.0367179 secs] [Times: user=0.09 sys=0.01, real=0.04 secs] 
    2023-09-05T09:03:16.274+0000: [Full GC (Metadata GC Threshold) [PSYoungGen: 62922K->0K(1994240K)] [ParOldGen: 230799K->105180K(4273664K)] 293722K->105180K(6267904K), [Metaspace: 254605K->254253K(1290240K)], 0.2592507 secs] [Times: user=0.56 sys=0.01, real=0.25 secs] 
    2023-09-05T09:03:21.380+0000: [GC (Allocation Failure) [PSYoungGen: 1843200K->28324K(1985536K)] 1948380K->133525K(6259200K), 0.0179690 secs] [Times: user=0.04 sys=0.00, real=0.02 secs] 
    0
    -------------------   nrq_customerassetproperty
    printing nrq_customerassetproperty columns
    ['Col4', 'Col5', 'Col6']
    0
    -------------------   msdyn_workorder
    printing msdyn_workorder columns
    ['Col1', 'Col2', 'Col3']
    

    To make it synchronous you need to use awaitTermination.

    df.writeStream.format("delta").foreachBatch(lambda df, epochId, cdm=cdm: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start().awaitTermination()
    

    Output:

    enter image description here