I am trying to access ignite cache values from spark map operation
I have exact same problem, and tried some method suggested by the person who asked the same question
val cache = ignite.getOrCreateCache[String,String]("newCache")
val cache_value = cache.get("key")
val myTransformedRdd = myRdd.map { x =>println(cache_value)}.take(2)
This is my sample code, I understood that, when we initiates ignite(Ignition.start()), it may only initiates in spark driver, but spark executes in executors. So in some executors the ignite may not be initiated.
So I tried this also,
val myTransformedRdd = myRdd.map { x =>
if(Ignition.state.toString=="STOPPED")
{
Ignition.start("/etc/ignite/examples/config/example-ignite1.xml")
}
println(cache_value)
}
From this I got the same error.
It seems, that ignite
in your sample is taken from the outer scope somewhere, outside the mapper function. Make sure, that you don't try to send this object over the network.
In your example you use cache_value
taken from the driver's context. Your mapper function should look something like
val myTransformedRdd = rdd.map { _ =>
val igniteCfg = Ignition.loadSpringBean("/etc/ignite/examples/config/example-ignite1.xml", "ignite.cfg")
val ignite = Ignition.getOrStart(igniteCfg)
val cache = ignite.getOrCreateCache[String,String]("newCache")
val cacheValue = cache.get("key")
println(cacheValue)
}
Note, that example-ignite1.xml
file should have a defenition of a ignite.cfg
bean of type IgniteConfiguration
.