I am working in Azure Databricks with the Python API, attempting to read all .snappy.parquet files into a dataframe from Azure blob storage (hierarchical ADLS gen 2 storage account). I have tried the following:
spark.conf.set("fs.azure.account.key.*StorageAccount*.blob.core.windows.net", AccessKey)
df = spark.read.parquet("https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/")
I encounter the following error:
AnalysisException: Incompatible format detected.
A transaction log for Delta was found at `https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/_delta_log`,
but you are trying to read from `https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/` using format("parquet"). You must use
'format("delta")' when reading and writing to a delta table.
To disable this check, SET spark.databricks.delta.formatCheck.enabled=false
To learn more about Delta, see https://docs.microsoft.com/azure/databricks/delta/index
The hint seems straigtforward, so I ran:
df = spark.read.format("delta").load("https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/")
This gives an exception, see end of post:
How can one read .snappy.parquet files in Pyspark? As far as I understand its partition and as such the _delta_log is interferring with the load.
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
File <command-2997183058335204>:1
----> 1 df = spark.read.format("delta").load("https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/")
File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature
51 )
52 return res
File /databricks/spark/python/pyspark/sql/readwriter.py:300, in DataFrameReader.load(self, path, format, schema, **options)
298 self.options(**options)
299 if isinstance(path, str):
--> 300 return self._df(self._jreader.load(path))
301 elif path is not None:
302 if type(path) != list:
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:162, in capture_sql_exception.<locals>.deco(*a, **kw)
160 def deco(*a: Any, **kw: Any) -> Any:
161 try:
--> 162 return f(*a, **kw)
163 except Py4JJavaError as e:
164 converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o1569.load.
: java.lang.UnsupportedOperationException
at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:95)
at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23)
at com.databricks.tahoe.store.DefaultLogStore.listFrom(DefaultLogStore.scala:194)
at com.databricks.tahoe.store.DelegatingLogStore.$anonfun$listFromUnsafe$1(DelegatingLogStore.scala:277)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at com.databricks.tahoe.store.DelegatingLogStore.listFromUnsafe(DelegatingLogStore.scala:275)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.listFrom(SnapshotManagementEdge.scala:60)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.listFrom$(SnapshotManagementEdge.scala:57)
at com.databricks.sql.transaction.tahoe.DeltaLog.listFrom(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagement.listFromOrNone(SnapshotManagement.scala:107)
at com.databricks.sql.transaction.tahoe.SnapshotManagement.listFromOrNone$(SnapshotManagement.scala:103)
at com.databricks.sql.transaction.tahoe.DeltaLog.listFromOrNone(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$listDeltaCompactedDeltaAndCheckpointFiles$1(SnapshotManagementEdge.scala:95)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag(DeltaLogging.scala:196)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag$(DeltaLogging.scala:183)
at com.databricks.sql.transaction.tahoe.DeltaLog.withOperationTypeTag(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$2(DeltaLogging.scala:160)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263)
at com.databricks.sql.transaction.tahoe.DeltaLog.recordFrameProfile(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:159)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:559)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:654)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:675)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:415)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:196)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:413)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:407)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:23)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:459)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:444)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:23)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:649)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:568)
at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:23)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:559)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:529)
at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:23)
at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:63)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:145)
at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:72)
at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:59)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:104)
at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:433)
at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:412)
at com.databricks.sql.transaction.tahoe.DeltaLog.recordOperation(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:158)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:148)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:138)
at com.databricks.sql.transaction.tahoe.DeltaLog.recordDeltaOperation(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.listDeltaCompactedDeltaAndCheckpointFiles(SnapshotManagementEdge.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.listDeltaCompactedDeltaAndCheckpointFiles$(SnapshotManagementEdge.scala:89)
at com.databricks.sql.transaction.tahoe.DeltaLog.listDeltaCompactedDeltaAndCheckpointFiles(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.getLogSegmentForVersion(SnapshotManagementEdge.scala:75)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.getLogSegmentForVersion$(SnapshotManagementEdge.scala:69)
at com.databricks.sql.transaction.tahoe.DeltaLog.getLogSegmentForVersion(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagement.getLogSegmentFrom(SnapshotManagement.scala:85)
at com.databricks.sql.transaction.tahoe.SnapshotManagement.getLogSegmentFrom$(SnapshotManagement.scala:83)
at com.databricks.sql.transaction.tahoe.DeltaLog.getLogSegmentFrom(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$getSnapshotAtInit$5(SnapshotManagementEdge.scala:346)
at scala.Option.orElse(Option.scala:447)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$getSnapshotAtInit$1(SnapshotManagementEdge.scala:346)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263)
at com.databricks.sql.transaction.tahoe.DeltaLog.recordFrameProfile(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.getSnapshotAtInit(SnapshotManagementEdge.scala:330)
at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.getSnapshotAtInit$(SnapshotManagementEdge.scala:329)
at com.databricks.sql.transaction.tahoe.DeltaLog.getSnapshotAtInit(DeltaLog.scala:92)
at com.databricks.sql.transaction.tahoe.SnapshotManagement.$init$(SnapshotManagement.scala:75)
at com.databricks.sql.transaction.tahoe.DeltaLog.<init>(DeltaLog.scala:99)
at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:1026)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:372)
at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:1018)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag(DeltaLogging.scala:196)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag$(DeltaLogging.scala:183)
at com.databricks.sql.transaction.tahoe.DeltaLog$.withOperationTypeTag(DeltaLog.scala:714)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$2(DeltaLogging.scala:160)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263)
at com.databricks.sql.transaction.tahoe.DeltaLog$.recordFrameProfile(DeltaLog.scala:714)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:159)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:559)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:654)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:675)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:415)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:196)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:413)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:407)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:23)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:459)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:444)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:23)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:649)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:568)
at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:23)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:559)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:529)
at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:23)
at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:63)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:145)
at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:72)
at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:59)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:104)
at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:433)
at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:412)
at com.databricks.sql.transaction.tahoe.DeltaLog$.recordOperation(DeltaLog.scala:714)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:158)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:148)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:138)
at com.databricks.sql.transaction.tahoe.DeltaLog$.recordDeltaOperation(DeltaLog.scala:714)
at com.databricks.sql.transaction.tahoe.DeltaLog$.createDeltaLog$1(DeltaLog.scala:1018)
at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$apply$7(DeltaLog.scala:1049)
at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:124)
at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$apply$6(DeltaLog.scala:1049)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4724)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3522)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2315)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2278)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2193)
at com.google.common.cache.LocalCache.get(LocalCache.java:3932)
at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4721)
at com.databricks.sql.transaction.tahoe.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:1047)
at com.databricks.sql.transaction.tahoe.DeltaLog$.apply(DeltaLog.scala:1063)
at com.databricks.sql.transaction.tahoe.DeltaLog$.forTable(DeltaLog.scala:933)
at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.$anonfun$deltaLog$1(DeltaTableV2.scala:110)
at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$withAdditionalSnapshotInitializationUsageLogData$1(DeltaLog.scala:810)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1717)
at com.databricks.sql.transaction.tahoe.DeltaLog$.withAdditionalSnapshotInitializationUsageLogData(DeltaLog.scala:811)
at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:110)
at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:106)
at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.$anonfun$snapshot$6(DeltaTableV2.scala:158)
at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$withAdditionalSnapshotInitializationUsageLogData$1(DeltaLog.scala:810)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1717)
at com.databricks.sql.transaction.tahoe.DeltaLog$.withAdditionalSnapshotInitializationUsageLogData(DeltaLog.scala:811)
at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.$anonfun$snapshot$5(DeltaTableV2.scala:157)
at scala.Option.orElse(Option.scala:447)
at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.snapshot$lzycompute(DeltaTableV2.scala:157)
at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.snapshot(DeltaTableV2.scala:133)
at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:242)
at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.$anonfun$createRelation$5(DeltaDataSource.scala:251)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263)
at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.recordFrameProfile(DeltaDataSource.scala:53)
at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:209)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:383)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:378)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:334)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:334)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
at sun.reflect.GeneratedMethodAccessor511.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:750)
Because it's a Delta Lake table, it's better to use delta
format, not the parquet
format, otherwise you can get data duplicates if you overwritten the data, for example.
You also need to use abfss
protocol, not https
. Just do:
df = spark.read.format("delta") \
.load("abfss://<container>@<StorageAccount>.dfs.core.windows.net/folder/")