I want to stream data from MS Defender lake houseusing autoloader. Data have this folder structure in Blob storage: y=2023/m=06/d=27/h=23/m=00
Problem is that into streaming are included even partition columns and there is column named "m" twice. Once for month and once for minute. I tried to explicitly select columns for stream read but this is not working. Any idea how to either omit partition columns or drop them before I get error?
My code:
bronze_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("badRecordsPath", bad_record_path)
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(f"{defender_adls}")
.selectExpr("Tenant", "category", "operationName", "properties", "tenantId", "time")
)
error message: [STREAM_FAILED] Query [id = f87a50cb-97f4-450f-a59c-b296109a21aa, runId = 68b2056e-767c-4df0-9935-c55f39d7c0e0] terminated with exception: [AMBIGUOUS_REFERENCE] Reference m
is ambiguous, could be: [m
, m
].
I have tried the approach to Filter to drop a specific partition.
from pyspark.sql.functions import col
df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', 'CSV') \
.option('cloudFiles.schemaLocation', schema_loc) \
.option('header', True) \
.load(source_data_loc)
df_filtered = df.filter((col('y') != '2023') &
(col('m') != '06') &
(col('d') != '27') &
(col('h') != '23') &
(col('m') != '00'))
df_filtered.show()
I have used CSV file as an example.
ADLS File Path:
You can try the below approach
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
source_data_loc = "abfss://folder02@dileepstoragegen2.dfs.core.windows.net/actual_data_csv/y=2023/m=06/d=27/h=23/m=00"
target_data_loc = "abfss://folder02@dileepstoragegen2.dfs.core.windows.net/Autoloader/output.csv"
checkpoint_data_loc = "abfss://folder02@dileepstoragegen2.dfs.core.windows.net/checkpoints"
schema_loc = "abfss://folder02@dileepstoragegen2.dfs.core.windows.net/schema"
spark = SparkSession.builder.appName("AzureStorageExample").getOrCreate()
customSchema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', "CSV") \
.option('cloudFiles.schemaLocation', schema_loc) \
.option('header', True) \
.schema(customSchema) \
.load(source_data_loc)
df = df.drop('year', 'month', 'day', 'hour', 'minute')
df.display()