Search code examples
apache-sparkpysparkazure-databricksparquetdelta-lake

Read all .snappy.parquet files in Azure blob directory (Azure Databricks)


I am working in Azure Databricks with the Python API, attempting to read all .snappy.parquet files into a dataframe from Azure blob storage (hierarchical ADLS gen 2 storage account). I have tried the following:

spark.conf.set("fs.azure.account.key.*StorageAccount*.blob.core.windows.net", AccessKey)
df = spark.read.parquet("https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/")

I encounter the following error:

AnalysisException: Incompatible format detected. 
A transaction log for Delta was found at `https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/_delta_log`,
but you are trying to read from `https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/` using format("parquet"). You must use
'format("delta")' when reading and writing to a delta table.

To disable this check, SET spark.databricks.delta.formatCheck.enabled=false
To learn more about Delta, see https://docs.microsoft.com/azure/databricks/delta/index

The hint seems straigtforward, so I ran:

df = spark.read.format("delta").load("https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/")

This gives an exception, see end of post:

How can one read .snappy.parquet files in Pyspark? As far as I understand its partition and as such the _delta_log is interferring with the load.

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
File <command-2997183058335204>:1
----> 1 df = spark.read.format("delta").load("https://*StorageAccount*.blob.core.windows.net/*BlobFolder*/")

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:300, in DataFrameReader.load(self, path, format, schema, **options)
    298 self.options(**options)
    299 if isinstance(path, str):
--> 300     return self._df(self._jreader.load(path))
    301 elif path is not None:
    302     if type(path) != list:

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:162, in capture_sql_exception.<locals>.deco(*a, **kw)
    160 def deco(*a: Any, **kw: Any) -> Any:
    161     try:
--> 162         return f(*a, **kw)
    163     except Py4JJavaError as e:
    164         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o1569.load.
: java.lang.UnsupportedOperationException
    at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:95)
    at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23)
    at com.databricks.tahoe.store.DefaultLogStore.listFrom(DefaultLogStore.scala:194)
    at com.databricks.tahoe.store.DelegatingLogStore.$anonfun$listFromUnsafe$1(DelegatingLogStore.scala:277)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at com.databricks.tahoe.store.DelegatingLogStore.listFromUnsafe(DelegatingLogStore.scala:275)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.listFrom(SnapshotManagementEdge.scala:60)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.listFrom$(SnapshotManagementEdge.scala:57)
    at com.databricks.sql.transaction.tahoe.DeltaLog.listFrom(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagement.listFromOrNone(SnapshotManagement.scala:107)
    at com.databricks.sql.transaction.tahoe.SnapshotManagement.listFromOrNone$(SnapshotManagement.scala:103)
    at com.databricks.sql.transaction.tahoe.DeltaLog.listFromOrNone(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$listDeltaCompactedDeltaAndCheckpointFiles$1(SnapshotManagementEdge.scala:95)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag(DeltaLogging.scala:196)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag$(DeltaLogging.scala:183)
    at com.databricks.sql.transaction.tahoe.DeltaLog.withOperationTypeTag(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$2(DeltaLogging.scala:160)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263)
    at com.databricks.sql.transaction.tahoe.DeltaLog.recordFrameProfile(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:159)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:559)
    at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:654)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:675)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:415)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:196)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:413)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:407)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:23)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:459)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:444)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:23)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:649)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:568)
    at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:23)
    at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:559)
    at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:529)
    at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:23)
    at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:63)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:145)
    at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:72)
    at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:59)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:104)
    at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:433)
    at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:412)
    at com.databricks.sql.transaction.tahoe.DeltaLog.recordOperation(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:158)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:148)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:138)
    at com.databricks.sql.transaction.tahoe.DeltaLog.recordDeltaOperation(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.listDeltaCompactedDeltaAndCheckpointFiles(SnapshotManagementEdge.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.listDeltaCompactedDeltaAndCheckpointFiles$(SnapshotManagementEdge.scala:89)
    at com.databricks.sql.transaction.tahoe.DeltaLog.listDeltaCompactedDeltaAndCheckpointFiles(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.getLogSegmentForVersion(SnapshotManagementEdge.scala:75)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.getLogSegmentForVersion$(SnapshotManagementEdge.scala:69)
    at com.databricks.sql.transaction.tahoe.DeltaLog.getLogSegmentForVersion(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagement.getLogSegmentFrom(SnapshotManagement.scala:85)
    at com.databricks.sql.transaction.tahoe.SnapshotManagement.getLogSegmentFrom$(SnapshotManagement.scala:83)
    at com.databricks.sql.transaction.tahoe.DeltaLog.getLogSegmentFrom(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$getSnapshotAtInit$5(SnapshotManagementEdge.scala:346)
    at scala.Option.orElse(Option.scala:447)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$getSnapshotAtInit$1(SnapshotManagementEdge.scala:346)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263)
    at com.databricks.sql.transaction.tahoe.DeltaLog.recordFrameProfile(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.getSnapshotAtInit(SnapshotManagementEdge.scala:330)
    at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.getSnapshotAtInit$(SnapshotManagementEdge.scala:329)
    at com.databricks.sql.transaction.tahoe.DeltaLog.getSnapshotAtInit(DeltaLog.scala:92)
    at com.databricks.sql.transaction.tahoe.SnapshotManagement.$init$(SnapshotManagement.scala:75)
    at com.databricks.sql.transaction.tahoe.DeltaLog.<init>(DeltaLog.scala:99)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:1026)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:372)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:1018)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag(DeltaLogging.scala:196)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag$(DeltaLogging.scala:183)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.withOperationTypeTag(DeltaLog.scala:714)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$2(DeltaLogging.scala:160)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.recordFrameProfile(DeltaLog.scala:714)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:159)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:559)
    at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:654)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:675)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:415)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:196)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:413)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:407)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:23)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:459)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:444)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:23)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:649)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:568)
    at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:23)
    at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:559)
    at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:529)
    at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:23)
    at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:63)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:145)
    at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:72)
    at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:59)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:104)
    at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:433)
    at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:412)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.recordOperation(DeltaLog.scala:714)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:158)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:148)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:138)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.recordDeltaOperation(DeltaLog.scala:714)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.createDeltaLog$1(DeltaLog.scala:1018)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$apply$7(DeltaLog.scala:1049)
    at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:124)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$apply$6(DeltaLog.scala:1049)
    at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4724)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3522)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2315)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2278)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2193)
    at com.google.common.cache.LocalCache.get(LocalCache.java:3932)
    at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4721)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:1047)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.apply(DeltaLog.scala:1063)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.forTable(DeltaLog.scala:933)
    at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.$anonfun$deltaLog$1(DeltaTableV2.scala:110)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$withAdditionalSnapshotInitializationUsageLogData$1(DeltaLog.scala:810)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1717)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.withAdditionalSnapshotInitializationUsageLogData(DeltaLog.scala:811)
    at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:110)
    at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:106)
    at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.$anonfun$snapshot$6(DeltaTableV2.scala:158)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.$anonfun$withAdditionalSnapshotInitializationUsageLogData$1(DeltaLog.scala:810)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1717)
    at com.databricks.sql.transaction.tahoe.DeltaLog$.withAdditionalSnapshotInitializationUsageLogData(DeltaLog.scala:811)
    at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.$anonfun$snapshot$5(DeltaTableV2.scala:157)
    at scala.Option.orElse(Option.scala:447)
    at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.snapshot$lzycompute(DeltaTableV2.scala:157)
    at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.snapshot(DeltaTableV2.scala:133)
    at com.databricks.sql.transaction.tahoe.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:242)
    at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.$anonfun$createRelation$5(DeltaDataSource.scala:251)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263)
    at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.recordFrameProfile(DeltaDataSource.scala:53)
    at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:209)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:383)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:378)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:334)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:334)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
    at sun.reflect.GeneratedMethodAccessor511.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.lang.Thread.run(Thread.java:750)

Solution

  • Because it's a Delta Lake table, it's better to use delta format, not the parquet format, otherwise you can get data duplicates if you overwritten the data, for example.

    You also need to use abfss protocol, not https. Just do:

    df = spark.read.format("delta") \
      .load("abfss://<container>@<StorageAccount>.dfs.core.windows.net/folder/")