I'm trying to get the input file name (or path) for every file loaded through an S3 data catalog in AWS Glue.
I've read in a few places that input_file_name()
should provide this information (though caveated that this only works when calling from_catalog
and not from_options
, which I believe I am!).
So the code below seems like it should work, but always returns empty values for every input_file_name
.
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args['JOB_NAME'], args)
# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
database='database_name',
table_name='table_name',
transformation_ctx='fm_source',
)
df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)
Resulting output:
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
Is there another way that I should be creating the frame that ensures input_file_name()
is populated? I've now tried to build a source frame through create_dynamic_frame.from_catalog
, create_dynamic_frame.from_options
and getSource().getFrame()
, but I get the same result of an empty input_file_name
column for each.
I believe this to be impossible when using the groupFiles
option, given that behind the scenes Glue is concatenating files to create the optimal number of inputs. Thus, the concept of input_file_name
does not make sense in this context, given the original file path is no longer a direct input.
However, the docs are slightly misleading in the sense that even for inputs fewer than 50,000 files, not explicitly disabling the option will trigger Glue to concatenate inputs depending on their file size. In our case, we have thousands of tiny input files (<1 MB) causing this behaviour.
You can easily verify this, by explicitly disabling the grouping (note this will have a severe performance impact for scenarios similar to ours:
ds_s3 = self.gc.getSource(
connection_type='s3',
paths=paths,
groupFiles='none',
)
fm_s3 = ds_s3.getFrame()
Of course, it is better not to have to depend on the input state or context, and so we ended up writing an AWS Lambda triggered on S3 PUT
which writes metadata (including the filename and path) into the file itself.