Good day
I am writing a Glue job on AWS to transform data. After doing a join on two sets of data (resulting in a dataframe of around 100MB in size), I get a Nullpointer exception when retrieving the count on the dataframe. What makes this bug difficult to trace is that it only happens sporadically - occasionally it succeeds.
The error is:
21/05/07 08:27:08 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
File "/tmp/transform.py", line 398, in <module>
main()
File "/tmp/transform.py", line 355, in main
extract_data(context, df1_trans, df2_trans)
File "/tmp/transform.py", line 264, in extract_data
joined_count = joined.count()
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 522, in count
return int(self._jdf.count())
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o689.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 13.0 failed 4 times, most recent failure: Lost task 7.3 in stage 13.0 (TID 8795, 172.35.98.112, executor 5): java.lang.NullPointerException
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:207)
at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:642)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:148)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:131)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:418)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:352)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
The code in question:
print("Applying join.")
joined = (df1.join(df2, df1['ID'] == df2['ID'], how='inner')
.select(df1["*"], df2["*"])
.dropDuplicates(['ID', 'VAL']))
extract_data joined_count = joined.count() # Nullpointer exception here...
print(f"Joined data: {joined_count}.")
write_out(context, joined, 'joined', "s3://<some_bucket>", "csv")
# Retrieve data from joined data.
tmp = (joined
.withColumn('IDENT', joined['ID'])
.withColumn('V1', joined['SOME_VALUE'])
.withColumn('V2', joined['TIME'])
.withColumn('V3', sf.lit('BLAH'))
.withColumn('V4', sf.lit('3.14'))
.select(['IDENT', 'V1', 'V2', 'V3', 'V4']))
tmp_count = tmp.count()
and the write out code:
def write_out(context, out, name, destination, destination_format):
"""
Writes out the data as a single file.
:param context:
:param out:
:param name:
:param destination:
:param destination_format:
:return:
"""
print(f"Writing {name} to {destination}.")
glue_df = DynamicFrame.fromDF(out.repartition(1), context)
context.write_dynamic_frame.from_options(
frame=glue_df,
connection_type="s3",
connection_options={"path": destination},
format=destination_format)
...any assistance or just ideas on where to look would be helpful.
In case someone also runs into this. It can happen the moment the data gets "collected". So on writing out a partition of data, getting counts, etc.
The solution seems to be related to a Glue Dynamic Dataframe when loading the data. Change to a Spark Dataframe and you're waxed.