Search code examples
jsonamazon-web-servicesapache-sparkpysparkbigdata

How do I split / chunk Large JSON Files with AWS glueContext before converting them to JSON?


I'm trying to convert a 20GB JSON gzip file to parquet using AWS Glue.

I've setup a job using Pyspark with the code below.

I got this log WARN message:

LOG.WARN: Loading one large unsplittable file s3://aws-glue-data.json.gz with only one partition, because the file is compressed by unsplittable compression codec.

I was wondering if there was a way to split / chunk the file? I know I can do it with pandas, but unfortunately that takes far too long (12+ hours).

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
import pyspark.sql.functions
from pyspark.sql.functions import col, concat, reverse, translate
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

test = glueContext.create_dynamic_frame_from_catalog(
             database="test_db",
             table_name="aws-glue-test_table")


# Create Spark DataFrame, remove timestamp field and re-name other fields
reconfigure = test.drop_fields(['timestamp']).rename_field('name', 'FirstName').rename_field('LName', 'LastName').rename_field('type', 'record_type')

# Create pyspark DF
spark_df = reconfigure.toDF()
# Filter and only return 'a' record types 
spark_df = spark_df.where("record_type == 'a'")
# Once filtered, remove the record_type column
spark_df = spark_df.drop('record_type')
spark_df = spark_df.withColumn("LastName", translate("LastName", "LName:", ""))
spark_df = spark_df.withColumn("FirstName", reverse("FirstName"))

spark_df.write.parquet("s3a://aws-glue-bucket/parquet/test.parquet")


Solution

  • Spark does not parallelize reading a single gzip file. However, you can do split it in chunks.

    Also, Spark is really slow at reading gzip files(since its not paralleized). You can do this to speed it up:

    file_names_rdd = sc.parallelize(list_of_files, 100)
    lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())