I am performing an sql query on IgniteRDD. It is running ok for RDD transformation and actions but when I am Invoking an sql on the IgniteRDD, It won't return anything as result. Here are the exact code I've written
private val conf = new SparkConf()
.setAppName("IgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2")
/** Spark context */
val sparkContext = new SparkContext(conf)
/** Defines spring cache Configuration path */
private val CONFIG = "examples/config/example-shared-rdd.xml"
/** Creates Ignite context with above configuration configuration */
val igniteContext = new IgniteContext(sparkContext, CONFIG, false)
/** Creates an Ignite RDD of Type (Int,Int) Integer Pair */
val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
/** Fill IgniteRDD with Int pairs */
sharedRDD.savePairs(sparkContext.parallelize(1 to 1000, 10).map(i => (i, i)))
/** Transforming Pairs to contain their Squared value */
sharedRDD.mapValues(x => (x * x))
/** Retrieve sharedRDD back from the Cache */
val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
transformedValues.take(5).foreach(println)
/** Performing SQL query on existing cache and
* collect result into a Spark Dataframe
* */
val rs = transformedValues.sql("select * from Integer where number > ? and number < ? ", 10, 100)
/** Show DataFrame results */
println("The count is::::::::::: "+rs.count)
here action .take(5) on transformed value is returning results and printing it. But When I run sql method on the above, It returns 0 as row count. I do not know. Please Help.
Below is my Cache configuration Settings:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<!-- SharedRDD cache example configuration (Atomic mode). -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="sharedRDD"/>
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<!-- set atomicity -->
<property name="atomicityMode" value="ATOMIC"/>
<!-- Number of backup nodes. -->
<property name="backups" value="1"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<!-- Setting indexed type's key class -->
<property name="keyType" value="Integer"></property>
<!-- Setting indexed type's value class -->
<property name="valueType" value="Integer"></property>
<property name="fields">
<map>
<entry key="number" value="Integer"/>
</map>
</property>
<!--Enable Index on the field -->
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="number"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
There is no number
field in the Integer
class, so it's always null
from SQL perspective. When using primitives as keys and/or values, you can utilize predefined _key
and _val
fields. So the query can look like this:
select * from Integer where _val > ? and _val < ?
Also there is no much reason to use queryEntities
in this case. setIndexedTypes
is enough and much simpler.