My pyspark code tries to create a dataframe and to write the dataframe to a an s3 location. After doing this i will have one file with name as part-***, I am trying to rename this file using hadoop file util libraries but getting the below error always. Is this operation possible in pyspark? Note: I cannot use boto3 here since i will be running this on EMR.
The code that i am using in pyspark is below:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test_rename").getOrCreate()
sc = spark.sparkContext
l = [['Column1', 'Column2', 'Column3'], ['Col1Value1', 'Col2Value1', 'Col3Value1'], ['Col1Value2', 'Col2Value2', 'Col3Value2']]
#Read the string data into a DataFrame
df = spark.createDataFrame(l[1:],l[0])
df.coalesce(1) \
.write.format("com.databricks.spark.csv") \
.option("header", "true") \
.mode("overwrite") \
.save("s3://<bucket>/<prefix>")
from py4j.java_gateway import java_import
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('s3://<bucket>/<prefix>/' + file), sc._jvm.Path('mydata.csv'))
fs.delete(sc._jvm.Path('s3://<bucket>/<prefix>'), True)
ERROR MESSAGE:
File "/mnt/tmp/spark-471166fb-d7c7-4839-a308-2e3f5c01c185/test_rename.py", line 20, in <module>
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.IllegalArgumentException: Wrong FS: s3://<bucket>/<prefix>, expected: hdfs://<emr-ip>:8020```
Hadoop's FileSystem needs an URI to determine the client implementation. You can provide it during initialisation of the FileSystem
object either using the uri
constructor parameter or fs.defaultFS
configuration parameter.
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jvm.java.net.URI("s3://bucket/"), spark._jsc.hadoopConfiguration())
fs.listStatus(sc._jvm.org.apache.hadoop.fs.Path("s3://bucket/prefix/"))
// Or
spark._jsc.hadoopConfiguration().set("fs.defaultFS", "s3://bucket/")
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
It's the same for both Python and Scala; you're accessing the same Java objects using Py4J really. If the same code worked in Scala, probably you had different fs.defaultFS
value in the Hadoop configuration.
Please see FileSystem
's source code and fs.defaultFS
description.
The name of the default file system. A URI whose scheme and authority determine the FileSystem implementation. The uri's scheme determines the config property (fs.SCHEME.impl) naming the FileSystem implementation class. The uri's authority is used to determine the host, port, etc. for a filesystem.