Let's suppose we have 2 files, file#1 created at 12:55 and file#2 created at 12:58. While reading these two files I want to add a new column "creation_time". Rows belong to file#1 have 12:55 in "creation_time" column and Rows belong to file#2 have 12:58 in "creation_time".
new_data = spark.read.option("header", "true").csv("s3://bucket7838-1/input")
I'm using above code snippet to read the files in "input" directory.
Use input_file_name()
function to get the filename and then use hdfs file api
to get the file timestamp finally join both dataframes on filename
from pyspark.sql.types import *
from pyspark.sql.functions import *
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(URI("hdfs://<namenode_address>:8020"), Configuration())
status = fs.listStatus(Path('<hdfs_directory>'))
filestatus_df=spark.createDataFrame([[str(i.getPath()),i.getModificationTime()/1000] for i in status],["filename","modified_time"]).\
#join both dataframes on filename to get filetimestamp