Search code examples
scalaapache-sparkignite

Ignite instance name thread local must be set or this method should be accessed under org.apache.ignite.thread.IgniteThread


I am trying to access ignite cache values from spark map operation

Ignite grid name thread local must be set or this method should be accessed under org.apache.ignite.thread.IgniteThread

I have exact same problem, and tried some method suggested by the person who asked the same question

val cache = ignite.getOrCreateCache[String,String]("newCache")
val cache_value = cache.get("key")
val myTransformedRdd = myRdd.map { x =>println(cache_value)}.take(2)

This is my sample code, I understood that, when we initiates ignite(Ignition.start()), it may only initiates in spark driver, but spark executes in executors. So in some executors the ignite may not be initiated.

So I tried this also,

val myTransformedRdd = myRdd.map { x =>
   if(Ignition.state.toString=="STOPPED")
    {
     Ignition.start("/etc/ignite/examples/config/example-ignite1.xml")
     }
println(cache_value)
}

From this I got the same error.


Solution

  • It seems, that ignite in your sample is taken from the outer scope somewhere, outside the mapper function. Make sure, that you don't try to send this object over the network.

    In your example you use cache_value taken from the driver's context. Your mapper function should look something like

    val myTransformedRdd = rdd.map { _ =>
      val igniteCfg = Ignition.loadSpringBean("/etc/ignite/examples/config/example-ignite1.xml", "ignite.cfg")
      val ignite = Ignition.getOrStart(igniteCfg)
      val cache = ignite.getOrCreateCache[String,String]("newCache")
      val cacheValue = cache.get("key")
      println(cacheValue)
    }
    

    Note, that example-ignite1.xml file should have a defenition of a ignite.cfg bean of type IgniteConfiguration.