Does the use of ThreadLocal for singleton variables make them thread safe for use in Spark Dataframe processing framework? The Breeze fourierTr functions use ThreadLocal and it seems to cause problems for me.
I am building an application to assemble multi-dimensional tables and compute an FFT on various dimensions.
val r = df.rdd.flatMap{ row =>
// scrub the input, format data into coordinates with a value
// create a key corresponding to a slice through the data
// that will get processed in the next step
}
.groupByKey.flatMap{ case( sliceKey, coordinateList ) =>
// note the vector length is variable
val buf = new Array[Complex]( lengthOfVector )
// fill buffer with values from data structure slice
fourierTr( new DenseVector(buf) )
}
Note, this is just pseudo code. I've cut out much of the real code to make a concise example.
The key point is the call for fourierTr. When I ran this on my local development machine everything was fine, and I got the expected result. However, when I moved to a bigger, multi-core machine I got the following exception:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 1 times, most recent failure: Lost task 2.0 in stage 10.0 (TID 36, localhost): java.lang.ArrayIndexOutOfBoundsException: 12
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.passfg(DoubleFFT_1D.java:3843)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.cfftf(DoubleFFT_1D.java:3390)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:189)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:161)
at breeze.signal.fourierTr$$anon$5.apply(fourierTr.scala:69)
at breeze.signal.fourierTr$$anon$5.apply(fourierTr.scala:62)
at breeze.generic.UFunc$class.apply(UFunc.scala:48)
at breeze.signal.fourierTr$.apply(fourierTr.scala:25)
At first I thought this may be due to a package versioning difference between my development machine and the cluster (ie. using AWS). After making sure all the relevant jar versions were matching I still had the same problem. Then I determined that the application ran fine if I launched it with
spark-submit --master local[1] ...
However, if I launched it with
spark-submit --master local[2] ...
or any node count higher than 2 then I would get the exception. Which made me suspect that some memory was getting corrupted somehow. So I started digging into the library source.
The entry point is in fourierTr.scala
implicit val dvComplex1DFFT : fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] = {
new fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] {
def apply(v: DenseVector[Complex]) = {
//reformat for input: note difference in format for input to real fft
val tempArr = denseVectorCToTemp(v)
//actual action
val fft_instance = getD1DInstance(v.length)
fft_instance.complexForward( tempArr ) //does operation in place
//reformat for output
tempToDenseVector(tempArr)
}
}
}
Pushing in to getD1DInstance in JTransformsSupport.scala I find ...
object JTransformsSupport {
//maintain instance of transform to eliminate repeated initialization
private val fft_instD1D = new ThreadLocal[(Int, DoubleFFT_1D)]
def getD1DInstance(length: Int): DoubleFFT_1D = {
if (fft_instD1D.get != null && length == fft_instD1D.get._1) fft_instD1D.get._2
else {
fft_instD1D.set((length, new DoubleFFT_1D(length)))
fft_instD1D.get()._2
}
}
Note that it is modifying a shared variable fft_instD1D. I'm not very familiar with the ThreadLocal type, but it seems this is intended to make the class thread safe. However, I changed my code to instantiate a DoubleFFT_1D object as a stack variable, then I called all the low level routines directly (eg. rather than call fourierTr I called DoubleFFT_1D.complexForward).
After making this change the exception no longer occurred regardless of the number of nodes used by Spark. So it seems like the use of ThreadLocal by the Fourier transform library was the culprit.
I would like to know if others who would consider themselves experts in Scala/Breeze/Spark agree with my conclusion?
If it is not correct, please suggest how to use Breeze (specifically fourierTr) properly in the context of Spark Dataframe processing.
If it is the correct conclusion, then I have some follow up questions...
Please file a bug with Breeze: github.com/scalanlp/breeze/issues
Breeze strives to be threadsafe (that's why we use ThreadLocal to have one instance of the transform per thread), but something went wrong here. (That ThreadLocal code looks totally fine to me, but then again I wrote it.)