Search code examples
apache-sparkamazon-s3pyspark

I am trying to write a dataframe to a single file in s3 with a desired file name in pyspark. I am able to do this in scala but gives below error in py


My pyspark code tries to create a dataframe and to write the dataframe to a an s3 location. After doing this i will have one file with name as part-***, I am trying to rename this file using hadoop file util libraries but getting the below error always. Is this operation possible in pyspark? Note: I cannot use boto3 here since i will be running this on EMR.

The code that i am using in pyspark is below:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test_rename").getOrCreate()
sc = spark.sparkContext

l = [['Column1', 'Column2', 'Column3'], ['Col1Value1', 'Col2Value1', 'Col3Value1'], ['Col1Value2', 'Col2Value2', 'Col3Value2']]
#Read the string data into a DataFrame
df = spark.createDataFrame(l[1:],l[0])

df.coalesce(1) \
    .write.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("s3://<bucket>/<prefix>")

from py4j.java_gateway import java_import
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('s3://<bucket>/<prefix>/' + file), sc._jvm.Path('mydata.csv'))
fs.delete(sc._jvm.Path('s3://<bucket>/<prefix>'), True)

ERROR MESSAGE:

File "/mnt/tmp/spark-471166fb-d7c7-4839-a308-2e3f5c01c185/test_rename.py", line 20, in <module>
    file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.IllegalArgumentException: Wrong FS: s3://<bucket>/<prefix>, expected: hdfs://<emr-ip>:8020```

Solution

  • Hadoop's FileSystem needs an URI to determine the client implementation. You can provide it during initialisation of the FileSystem object either using the uri constructor parameter or fs.defaultFS configuration parameter.

    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jvm.java.net.URI("s3://bucket/"), spark._jsc.hadoopConfiguration())
    fs.listStatus(sc._jvm.org.apache.hadoop.fs.Path("s3://bucket/prefix/"))
    
    // Or
    spark._jsc.hadoopConfiguration().set("fs.defaultFS", "s3://bucket/")
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    

    It's the same for both Python and Scala; you're accessing the same Java objects using Py4J really. If the same code worked in Scala, probably you had different fs.defaultFS value in the Hadoop configuration.

    Please see FileSystem's source code and fs.defaultFS description.

    The name of the default file system. A URI whose scheme and authority determine the FileSystem implementation. The uri's scheme determines the config property (fs.SCHEME.impl) naming the FileSystem implementation class. The uri's authority is used to determine the host, port, etc. for a filesystem.