Search code examples
apache-sparkpysparkapache-spark-sql

Can I get metadata of files reading by Spark


Let's suppose we have 2 files, file#1 created at 12:55 and file#2 created at 12:58. While reading these two files I want to add a new column "creation_time". Rows belong to file#1 have 12:55 in "creation_time" column and Rows belong to file#2 have 12:58 in "creation_time".

new_data = spark.read.option("header", "true").csv("s3://bucket7838-1/input")

I'm using above code snippet to read the files in "input" directory.


Solution

  • Use input_file_name() function to get the filename and then use hdfs file api to get the file timestamp finally join both dataframes on filename.

    Example:

    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    URI           = sc._gateway.jvm.java.net.URI
    Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
    
    fs = FileSystem.get(URI("hdfs://<namenode_address>:8020"), Configuration())
    
    status = fs.listStatus(Path('<hdfs_directory>'))
    
    filestatus_df=spark.createDataFrame([[str(i.getPath()),i.getModificationTime()/1000] for i in status],["filename","modified_time"]).\
    withColumn("modified_time",to_timestamp(col("modified_time")))
    
    input_df=spark.read.csv("<hdfs_directory>").\
    withColumn("filename",input_file_name())
    
    #join both dataframes on filename to get filetimestamp
    df=input_df.join(filestatus_df,['filename'],"left")