I am trying to use Spark/Scala in order to "edit" multiple parquet files (potentially 50k+) efficiently. The only edit that needs to be done is deletion (i.e. deleting records/rows) based on a given set of row IDs.
The parquet files are stored in s3 as a partitioned DataFrame where an example partition looks like this:
s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet
Each partition can have upwards of 100 parquet files that each are between 50mb and 500mb in size.
We are given a spark Dataset[MyClass]
called filesToModify
which has 2 columns:
s3path: String
= the complete s3 path to a parquet file in s3 that needs to be editedids: Set[String]
= a set of IDs (rows) that need to be deleted in the parquet file located at s3path
Example input dataset filesToModify
:
s3path | ids |
---|---|
s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet | Set("a", "b") |
s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet | Set("b") |
Given filesToModify
I want to take advantage of parallelism in Spark do the following for each row
:
row.s3path
id
is in the set row.ids
row.ids
(optional)row.s3path
to overwrite the fileI have tried using filesToModify.map(row => deleteIDs(row.s3path, row.ids))
where deleteIDs
is looks like this:
def deleteIDs(s3path: String, ids: Set[String]): Int = {
import spark.implicits._
val data = spark
.read
.parquet(s3path)
.as[DataModel]
val clean = data
.filter(not(col("id").isInCollection(ids)))
// write to a temp directory and then upload to s3 with same
// prefix as original file to overwrite it
writeToSingleFile(clean, s3path)
1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
}
However this leads to NullPointerException
when executed within the map
operation. If I execute it alone outside of the map
block then it works but I can't understand why it doesn't inside it (something to do with lazy evaluation?).
You get a NullPointerException
because you try to retrieve your spark session from an executor.
It is not explicit, but to perform spark action, your DeleteIDs
function needs to retrieve active spark session. To do so, it calls method getActiveSession
from SparkSession
object. But when called from an executor, this getActiveSession
method returns None
as stated in SparkSession's source code:
Returns the default SparkSession that is returned by the builder.
Note: Return None, when calling this function on executors
And thus NullPointerException
is thrown when your code starts using this None
spark session.
More generally, you can't recreate a dataset and use spark transformations/actions in transformations of another dataset.
So I see two solutions for your problem:
DeleteIDs
function's code without using spark, and modify your parquet files by using parquet4s for instance.filesToModify
to a Scala collection and use Scala's map
instead of Spark's one.