Search code examples
pythonazureapache-sparkpyspark

Spark DataFrame not persisting to the ADLS Gen2 container


I have a VM provisioned on Azure and have spark installed on it. I am trying to persist a dataframe to the ADLS gen2 container from spark on that VM using the access keys but not able to write it to the ADLS container

pyspark --jars azure-storage-7.0.0.jar,hadoop-azure-3.1.2.jar,hadoop-common-3.1.2.jar,jetty-util-12.0.5.jar,jetty-util-ajax-11.0.12.jar --conf spark.hadoop.fs.AbstractFileSystem.wasb.Impl=org.apache.hadoop.fs.azure.Wasb --conf spark.hadoop.fs.azure.account.key.<storageAccount>.blob.core.windows.net=<accessKey>`


data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]`
df = spark.createDataFrame(data)`
df.write.csv("wasbs://<container>@<storageAccount>.blob.core.windows.net/TestFolder/", mode = "append")

I keep getting this error :

24/01/30 20:43:13 ERROR Utils: Aborting task                        (0 + 2) / 2]
java.lang.IllegalStateException: Error closing the output.
at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:1000)
at org.apache.spark.sql.catalyst.csv.UnivocityGenerator.close(UnivocityGenerator.scala:124)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CsvOutputWriter.scala:48)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseCurrentWriter(FileFormatDataWriter.scala:64)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:75)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:105)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:404)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)`
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: This request is not authorized to perform this operation.
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2748)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2614)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:1178)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:1047)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at java.base/sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:341)
at java.base/sun.nio.cs.StreamEncoder.close(StreamEncoder.java:161)
at java.base/java.io.OutputStreamWriter.close(OutputStreamWriter.java:255)
at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:996)
... 25 more
Caused by: com.microsoft.azure.storage.StorageException: This request is not authorized to perform this operation.
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
at com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:735)
at com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:691)
at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:434)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2687)

however, I see the temp directories created under the container "<container>/ _$azuretmpfolder$" and csv files in it and also a directory with the name "<container>/ TestFolder / _temporary / 0 / _temporary" but no files in it.

Although error says its an authorization issue but i wrote a small python snippet on the same machine and was able to create a file under the same container with the same accessKey i am using in my pyspark code.

Could someone please help in find the root cause and fix for it.

Thanks, Jetinder


Solution

  • I have tried the below approach:

    spark.conf.set("fs.azure.account.key.<Storage Account>.dfs.core.windows.net","<Storage account Access Key>")
    csv_file_path = f"abfss://[email protected]/"
    data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
    df = spark.createDataFrame(data)
    df.write.csv("abfss://[email protected]/TestFolder/", mode = "append")
    df.display()
    

    Results:

    _1  _2
    Java    20000
    Python  100000
    Scala   3000
    

    enter image description here