Search code examples
apache-sparkapache-zeppelin

Zeppelin: muptiple SparkContexts issue


I tried to run the folloiwng simple code in Zeppelin:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream

System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")

def maxWaitTimeMillis: Int = 20000
def actuallyWait: Boolean = false

val conf = new SparkConf().setMaster("local[2]").setAppName("Streaming test")
var sc = new SparkContext(conf)

def batchDuration: Duration = Seconds(1)
val ssc = new StreamingContext(sc, batchDuration)

This is the output in Zeppelin:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
calculateRMSE: (output: org.apache.spark.streaming.dstream.DStream[(Double, Double)], n: org.apache.spark.streaming.dstream.DStream[Long])Double
res50: String = null
res51: String = null
maxWaitTimeMillis: Int
actuallyWait: Boolean
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@1daf4e42
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:82)
org.apache.zeppelin.spark.SparkInterpreter.createSparkContext(SparkInterpreter.java:356)
org.apache.zeppelin.spark.SparkInterpreter.getSparkContext(SparkInterpreter.java:150)
org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:525)
org.apache.zeppelin.interpreter.ClassloaderInterpreter.open(ClassloaderInterpreter.java:74)
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:68)
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:92)
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:345)
org.apache.zeppelin.scheduler.Job.run(Job.java:176)
org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257)
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2239)
    at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2312)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:91)

Why does it say that I have multiple SparkContexts running? If I do not add the line var sc = new SparkContext(conf), then sc is null, so it's not created.


Solution

  • You can't use multiple SparkContexts in Zeppelin. It's one of his limitations since he's creating actually a webhook to a SparkContext.

    If you wish to set up the your SparkConf in Zeppelin, the easiest way is to set those properties in the Interpreter menu and restart the interpreter to take those configuration in your SparkContext.

    Now you can go back to your notebook and test your code :

    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.{Logging, SparkConf, SparkContext}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.dstream.DStream
    
    def maxWaitTimeMillis: Int = 20000
    def actuallyWait: Boolean = false
    
    def batchDuration: Duration = Seconds(1)
    val ssc = new StreamingContext(sc, batchDuration)
    

    More on that here.