I am trying to read a big json file from s3 in my glue pyspark job (approx 87GB). I have to remove duplicates from this file, break this file into multiple smaller files and then save it back to s3. When I am trying to do so by running the below job. I am getting into resource issues.Is there a way I can optimize this?
Thanks for all the help in advance.
from pyspark.sql import SparkSession
if __name__ == '__main__':
app_name = "test"
spark = SparkSession.builder.appName(app_name).getOrCreate()
DATA_FILE_PATH = 's3://test//ids_20200606_173121.gz'
output_FILE_PATH = 's3://output/data/y=2020/m=06/d=10'
device_graph_df = spark.read.json(DATA_FILE_PATH)
distinct_device_graph_df = device_graph_df.dropDuplicates(['ip'])
device_graph_df = distinct_device_graph_df.repartition(40)
distinct_device_graph_df.write.parquet(output_FILE_PATH )
Error
Command failed with exit code 1 - Yarn resource manager killed the Spark application, please refer to Spark driver logs/metrics for the diagnosis. [Executor task launch worker for task 5] client.YarnClient (YarnClient.java:makeRestApiRequest(66)) - The GET request failed for the URL http://0.0.0.0:8088/ws/v1/cluster/apps/application_1591879099247_0001 com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.HttpHostConnectException: Connect to 0.0.0.0:8088 [/0.0.0.0] failed: Connection refused (Connection refused) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:158)
You have several issues:
Your very big JSON file is compressed with GZip, that make the file not splittable and all the file requires to be processed by only one executor (no matter if your job was configured with more workers). To solve this you can uncompress the file. If you require to compress the file to be able to deal with it then you can try BZIP2 or LZO, they are standard in hadoop but I haven't used them in Glue. This will be almost mandatory to be able to read the data (Process 87 GB of compressed data in only one node will require a huge amount of memory that Glue won't provide).
Spark requires to read the JSON file twice, one to infer the schema and the second to process the data, with a 87 GB of data bad distributed that could be a challenge. To minimize this you have two choices:
a) If you know the Schema of the JSON records then you can provide the Schema:
device_graph_df = spark.read.schema(<your schema).json(DATA_FILE_PATH)
b) Only read a fraction of the data to infer the schema (1/10th for example):
device_graph_df = spark.read.option("samplingRatio", "0.1").json(DATA_FILE_PATH)