I have a project where different kinds of jobs are run in spark for this we convert the data into sparks dataframe and are applying foreach lambda on those dataframes to make them execute parallely. Now for a job I need to store some variables at the start of the job and make use of them inside different api,db calls, so I thought of using ThreadLocal to store those variables and then pick from the same ThreadLocal whenever it is required.
But soon after I came to realize that the spark lambda forEach is creating different threads and hence the ThreadLocal won't work, so I shifted to InheritedThreadLocal and there too I wasn't able to fetch the variables. Here a demo code to show what I was doing
class Util {
static InheritedThreadLocal<Map> threadLocal = new InheritedThreadLocal();
}
class Job {
void runJob() {
Utils.threadLocal.set(key1, value1)
val list = someListData.asInstanceOf[RDD[Long]]
printThreadInfo1()
list.forEach(key => {
doSomething()
})
}
void doSomething() {
printThreadLocal2()
Utils.threadLocal.get(key1); // returns null value
}
}
Here the InheritedThreadLocal is returning null values inside doSomething so I tried print the thread Info here is the result for printThreadInfo1() => threadId=1 and ThreadName = main but for printThreadInfo2() inside is printing different information as threadId=34 threadName=Executor task launch worker for task 7.0 in stage 0.0 (TID 7)
What I believed that the threads creating under lambda should be a child thread of the main thread and hence the InheritedThreadLocal should have worked but looks like they are different threads of sorts.
Is there anything that I am doing wrong here or is there any way to share those variables inside a job between different spark workers?
The different threads that you see is the expected behavior.
The method runJob()
is executed on the driver, while the method doSomething()
is a task that gets executed on one of the worker nodes. This means that this code might be run in another JVM, so ThreadLocals won't work.
There is a diagram in the Spark docs illustrating the setup:
Instead of using ThreadLocals you can use broadcasts. This answer shows a simple example how to broadcast a Java map.