Search code examples
scalaignitegridgain

IgniteRDD doesn't return rows in the dataframe


I am performing an sql query on IgniteRDD. It is running ok for RDD transformation and actions but when I am Invoking an sql on the IgniteRDD, It won't return anything as result. Here are the exact code I've written

private val conf = new SparkConf()
.setAppName("IgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2")

  /** Spark context */
  val sparkContext = new SparkContext(conf)

  /** Defines spring cache Configuration path */
  private val CONFIG = "examples/config/example-shared-rdd.xml"

  /** Creates Ignite context with above configuration configuration */
  val igniteContext = new IgniteContext(sparkContext, CONFIG, false)

  /** Creates an Ignite RDD of Type (Int,Int) Integer Pair */
  val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")

  /** Fill IgniteRDD with Int pairs */
  sharedRDD.savePairs(sparkContext.parallelize(1 to 1000, 10).map(i => (i, i)))

  /** Transforming Pairs to contain their Squared value */
  sharedRDD.mapValues(x => (x * x))

  /** Retrieve sharedRDD back from the Cache */
  val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")

  transformedValues.take(5).foreach(println)

  /** Performing SQL query on existing cache and
    * collect result into a Spark Dataframe
    * */

  val rs = transformedValues.sql("select * from Integer where number > ? and number < ? ", 10, 100)
  /** Show DataFrame results */
  println("The count is::::::::::: "+rs.count)

here action .take(5) on transformed value is returning results and printing it. But When I run sql method on the above, It returns 0 as row count. I do not know. Please Help.

Below is my Cache configuration Settings:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
       <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="cacheConfiguration">
                <!-- SharedRDD cache example configuration (Atomic mode). -->
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                   <!-- Set a cache name. -->
            <property name="name" value="sharedRDD"/>

            <!-- Set cache mode. -->
            <property name="cacheMode" value="PARTITIONED"/>

                <!-- set atomicity -->
                <property name="atomicityMode" value="ATOMIC"/>
            <!-- Number of backup nodes. -->
                    <property name="backups" value="1"/> 
                      <property name="queryEntities">
                        <list>
                            <bean class="org.apache.ignite.cache.QueryEntity">
                                 <!-- Setting indexed type's key class -->
                                <property name="keyType" value="Integer"></property>

                                 <!-- Setting indexed type's value class -->
                                <property name="valueType" value="Integer"></property>

                                <property name="fields">
                                    <map>
                                        <entry key="number" value="Integer"/>
                                    </map>
                                </property>

                        <!--Enable Index on the field  -->          
                                <property name="indexes">

                    <list>

                       <bean class="org.apache.ignite.cache.QueryIndex">
                                              <constructor-arg value="number"/>
                                           </bean>

                    </list>                                   
                               </property>

                            </bean>
                        </list>
                    </property>
                </bean>
        </property>
        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!--
                        Ignite provides several options for automatic discovery that can be used
                        instead os static IP based discovery. For information on all options refer
                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
                    -->
                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace with actual host IP address. -->
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Solution

  • There is no number field in the Integer class, so it's always null from SQL perspective. When using primitives as keys and/or values, you can utilize predefined _key and _val fields. So the query can look like this:

    select * from Integer where _val > ? and _val < ?
    

    Also there is no much reason to use queryEntities in this case. setIndexedTypes is enough and much simpler.