I have multiple tables (csv files per table) loaded in azure datalake and would like to use autoloader to load everytable in Databricks Delta table.
I have a python code where I use the for loop
to create the schema per table, create the df
and then writeStream
the df
.
I also have the function update_insert
, where I do some data manipulation and also included the merge
function to update insert the delta tables.
This is my function code:
def update_insert(df, epochId, cdm):
# clean only 100% identical rows'
print("------------------- " + cdm)
df = df.dropDuplicates()
w = Window.partitionBy("Id").orderBy(F.col("modifiedon").desc())
df = df.withWatermark("modifiedon", "1 day").withColumn("rn", F.row_number().over(w)).where(F.col("rn") == 1).drop('rn')
# final =df.join(agg, on=["id", "modifiedon"], how="right")
dfUpdates = df.withColumnRenamed("id","BK_id")
p = re.compile('^BK_')
list_of_columns = dfUpdates.columns
list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column} and '
string_insert = ''
for column in list_of_BK_columns:
string_insert += f'table.{column} = newData.{column} and '
string_insert[:-4]
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
print("printing " + cdm + " columns")
print(dfUpdates.columns)
deltaTable = DeltaTable.forPath(spark,f"abfss://[email protected]/D365/{cdm}"+"_autoloader_nodups")
deltaTable.alias('table') \
.merge(dfUpdates.alias("newData"),
string
) \
.whenMatchedUpdate(set =
dictionary
) \
.whenNotMatchedInsert(values =
dictionary
) \
.execute()
Above function is used below in the autoloader's foreachBatch
:
for entity in manifest.collect()[0]['entities']:
cdm = entity.asDict()['name']
print(cdm)
schema = StructType()
length = len(entity.asDict()['attributes']) - 1
for index1, attribute in enumerate(entity.asDict()['attributes']):
if (attribute.asDict()['dataType'] in ('int32', 'time')) and (index1 != length):
field = StructField(attribute.asDict()['name'],IntegerType(),True)
schema.add(field)
elif attribute.asDict()['dataType'] in ('dateTime') and (index1 != length):
field = StructField(attribute.asDict()['name'],TimestampType(),True)
schema.add(field)
elif attribute.asDict()['dataType'] in ('string') and (index1 != length):
field = StructField(attribute.asDict()['name'],StringType(),True)
schema.add(field)
elif attribute.asDict()['dataType'] in ('int64') and (index1 != length):
field = StructField(attribute.asDict()['name'],LongType(),True)
schema.add(field)
elif attribute.asDict()['dataType'] in ('decimal') and (index1 != length):
field = StructField(attribute.asDict()['name'],DecimalType(38, 20),True)
schema.add(field)
elif index1 == length:
field = StructField(attribute.asDict()['name'],StringType(),True)
schema.add(field)
LastColumnName = attribute.asDict()['name']
LastColumnDataType = attribute.asDict()['dataType']
else:
field = StructField(attribute.asDict()['name'],StringType(),True)
schema.add(field)
# Define variables
checkpoint_directory = f"abfss://[email protected]/D365/checkpoints/{cdm}"
data_source = f"abfss://[email protected]/*/{cdm}/*.csv"
source_format = "csv"
# Configure Auto Loader to ingest csv data to a Delta table
print("schema for " + cdm)
# print(schema)
df = (
spark.readStream
.option("delimiter", ",")
.option("quote", '"')
.option("mode", "permissive")
.option("lineSep", "\r\n")
.option("multiLine", "true")
.format("cloudFiles")
.option("cloudFiles.format", source_format)
# .option("cloudFiles.schemaLocation", checkpoint_directory)
.option("cloudFiles.inferColumnTypes","true")
.option("header", "false")
.option("escape", '"')
.schema(schema)
.load(data_source)
)
print("writing " + cdm)
# print(df.columns)
df.writeStream.format("delta").foreachBatch(lambda df, epochId: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()
The problem is that for each loop is not working as it is supposed to work. I have added the print statments to the code to see which df
are created for which tables.
For example:
print(cdm)
(cdm
is the name of the table) and output is msdyn_workorder
print("schema for " + cdm)
and output is schema for msdyn_workorder
print("writing " + cdm
and out put is writing msdyn_workorder
This is where it goes wrong as the next print should give the outpur of the print which is inside the function print("------------------- " + cdm)
. Instead what it does is printing the next table name print(cdm)
which is nrq_customerassetproperty
, so starting for loop again (i have only two tables in so for loop
should run twice).
Then it continues same sequence of printing statements
print("schema for " + cdm)
and output is schema for nrq_customerassetproperty
print("writing " + cdm
and out put is writing nrq_customerassetpropertyAnd here it started to print things which are in the def
like : print("------------------- " + cdm)
, print("schema for " + cdm)
has the out printing nrq_customerassetproperty columns
.
With the next print it gets interested that when I ask to print(dfUpdates.columns)
which should be df
I read in the for each
loop. It prints the columns of the previous df
. in this case columns of the msdyn_workorder
.
I dont know where it goes wrong. Is it that streaming data has some problems with for loop
s?
Screenshot of print statements.
note that its printing printing nrq_customerassetproperty columns
but the columns does correspond to msdyn_workorder
table.
Pass cdm
in foreachBatch
function like below.
lambda df, epochId, cdm=cdm: update_insert(df, epochId, cdm)
Because when you pass without specifying the cdm in lambda it will take the cdm value from the outer scope, which is the value it had at the time of lambda creation.
Below is the update function used.
def update_insert(df, epochId,cdm):
print(epochId)
df.show()
print("------------------- " + cdm)
print("printing " + cdm + " columns")
print(df.columns)
And I ran your writeStream
code.
Output:
It seems like your print statements of next loop getting initially because the foreachBatch
function runs asynchronously and repeatedly for each batch of streaming data. Whatever the print statements outside foreachBatch
are printed in driver program and prints there.
Below is output.
msdyn_workorder
schema for msdyn_workorder
writing msdyn_workorder
2023-09-05T09:03:10.400+0000: [GC (Allocation Failure) [PSYoungGen: 1857533K->64491K(1965056K)] 2088300K->295274K(6238720K), 0.0468967 secs] [Times: user=0.09 sys=0.02, real=0.05 secs]
Next Df
nrq_customerassetproperty
schema for nrq_customerassetproperty
writing nrq_customerassetproperty
Next Df
2023-09-05 09:03:16,220 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'publicFile.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,222 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'privateFile.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,223 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.UsageLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,224 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.ProductLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,225 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.LineageLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,226 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.MetricsLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,227 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'dltExecution.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05T09:03:16.237+0000: [GC (Metadata GC Threshold) [PSYoungGen: 782632K->62922K(1994240K)] 1013415K->293722K(6267904K), 0.0367179 secs] [Times: user=0.09 sys=0.01, real=0.04 secs]
2023-09-05T09:03:16.274+0000: [Full GC (Metadata GC Threshold) [PSYoungGen: 62922K->0K(1994240K)] [ParOldGen: 230799K->105180K(4273664K)] 293722K->105180K(6267904K), [Metaspace: 254605K->254253K(1290240K)], 0.2592507 secs] [Times: user=0.56 sys=0.01, real=0.25 secs]
2023-09-05T09:03:21.380+0000: [GC (Allocation Failure) [PSYoungGen: 1843200K->28324K(1985536K)] 1948380K->133525K(6259200K), 0.0179690 secs] [Times: user=0.04 sys=0.00, real=0.02 secs]
0
------------------- nrq_customerassetproperty
printing nrq_customerassetproperty columns
['Col4', 'Col5', 'Col6']
0
------------------- msdyn_workorder
printing msdyn_workorder columns
['Col1', 'Col2', 'Col3']
To make it synchronous you need to use awaitTermination
.
df.writeStream.format("delta").foreachBatch(lambda df, epochId, cdm=cdm: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start().awaitTermination()
Output: