Search code examples
pysparkaws-glue

Glue Job Writes Multiple Partitions to the Same File


I'm trying to write a glue job that converts multiples csv files to separate json files using each row of the csv for a file. When the job finishes, the correct number of files show up in s3, but some are empty and some have multiple json objects in the same file.

After I apply the mapping, this is how I create the partitions and write the files:

numEntities = applyMapping1.toDF().count()
partitions = applymapping1.repartition(numEntities)
partitions.toDF().write.mode("ignore").format("json").option("header", "true").save("s3://location/test")

Using this, some files are created as a json file that has 2 objects one after the other, some are correct, and some are empty.

Is there any way I can ensure that each partition creates a separate file with only its data?


Solution

  • I think the Partitioner behind repartition does not exactly do what you intend:

    It created as many partitions as you requested - so far so good. But it didn't distribute the rows into only one of the partitions each. That may be due to the logic in HashPartitioner having calculated the same hash value for more than one of the rows.

    As an alternative to repartition.save... you could use foreachPartition and then iterate over each row, saving it to a file (e.g. under /tmp) and uploading it to S3. I wouldn't repartition before doing that because the UDF that will be executed from foreachPartition is rather expensive, so you should minimize the number of UDF invocations.

    Here is an example that worked for me. It's written in Scala, though:

    dynamicFrame.
      repartition(1).
      toDF().
      foreachPartition(p => {
        val out = new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream("/tmp/temp.xsv.gz")))
        p.foreach(r => {
          val row = ...
          out.write(row)
        })
        val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.EU_CENTRAL_1).build()
        val tm = TransferManagerBuilder.standard().withS3Client(s3).build()
        val rq = new PutObjectRequest(bucket, key, new File("/tmp/temp.xsv.gz"))
        tm.upload(rq).waitForCompletion()
      })