Search code examples
pythonamazon-web-servicescsvapache-sparkaws-glue

AWS Glue, output one file with partitions


I have a Glue ETL script that is taking a partitioned Athena table and outputting it to CSV. The table is partitioned on two criteria, unit and site. When the Glue job runs it creates a different CSV file for every combination of unit and site partitions. Instead, I'd just like one output file with all the partitions included, similar to how the athena table is structured

I've monkeyed around with "datasource0.toDF().repartition(1)" a bit but I'm not sure how it interfaces with the AWS provided script. I've done this with parquet tables but this script is structured differently

Note for the script below I've removed most of the tag mappings

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "testdata-2018-2019", table_name = "testdata", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "formatted-test-2018-2019", table_name = "testdata", transformation_ctx = "datasource0")
datasource0.toDF().repartition(1)
## @type: ApplyMapping
## @args: [mapping = [("time", "string", "time", "string"), ("unit", "string", "unit", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "time", "string"), ("`data.pv`", "double", ("site", "string", "site", "string"), ("unit", "string", "unit", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://testbucket/ParsedCSV-Data"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://buckettest/ParsedCSV-Data"}, format = "csv", transformation_ctx = "datasink2").repartition(1)
job.commit()

I'd like to modify the above script to output only one CSV file with the partitioned columns included. How can I do this?


Solution

  • You need to repartition before writing a DynamicFrame.

    repartitioned1 = applymapping1.repartition(1)
    datasink2 = glueContext.write_dynamic_frame.from_options(frame = repartitioned1, connection_type = "s3", connection_options = {"path": "s3://20182019testdata/ParsedCSV-Data"}, format = "csv", transformation_ctx = "datasink2")
    

    Regarding including partitioning column into the output file, I don't think it's possible. As a workaround, you can copy a column into a new one with a different name.

    df = applymapping1.toDF
    repartitioned_with_new_column_df = df.withColumn("_column1", df["column1"]).repartition(1)
    dyf = DynamicFrame.fromDF(repartitioned_with_new_column_df, glueContext, "enriched")
    datasink2 = glueContext.write_dynamic_frame.from_options(frame = dyf, connection_type = "s3", connection_options = {"path": "s3://20182019testdata/ParsedCSV-Data", , "partitionKeys": ["_column1"]}, format = "csv", transformation_ctx = "datasink2")