Search code examples
apache-sparkapache-spark-sqlignite

How to save & read spark DataFrame/ DataSet in Apache Ignite?


How to save & read spark DataFrame/ DataSet in Apache Ignite? I tried various solutions given on other similar questions but nothing has been working with latest ignite and spark version. (I am using scala 2.11) Thanks.

Update (adding code):

<?xml version="1.0" encoding="UTF-8"?>        
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="cacheConfiguration">
        <!-- SharedRDD cache example configuration (Atomic mode). -->
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="sharedRDD"/>
            <!-- Set a cache mode. -->
            <property name="cacheMode" value="PARTITIONED"/>

            <!-- Set atomicity mode. -->
            <property name="atomicityMode" value="ATOMIC"/>
            <!-- Configure a number of backups. -->
            <property name="backups" value="1"/>
        </bean>
    </property>

    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
    <property name="discoverySpi">
        <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
            <property name="ipFinder">
                <!--
                    Ignite provides several options for automatic discovery that can be used
                    instead os static IP based discovery. For information on all options refer
                    to our documentation: http://apacheignite.readme.io/docs/cluster-config
                -->
                <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                    <property name="addresses">
                        <list>
                            <!-- In distributed environment, replace with actual host IP address. -->
                            <value>127.0.0.1:47500..47509</value>
                        </list>
                    </property>
                </bean>
            </property>
        </bean>
    </property>
</bean>

IgniteCache code (This puts df and also tried to read it by converting into RDD):

 object SparkIgniteCache {
 private val CONFIG = "config/cache.xml"

 import org.apache.ignite.IgniteCache
 import org.apache.ignite.binary.BinaryObject
 import org.apache.ignite.cache.CacheAtomicityMode
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction
 import org.apache.ignite.configuration.CacheConfiguration


private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){
val ic = new IgniteContext(sc, CONFIG, false)

// FAILED ATTEMPT OF SETTING CONFIG : 1
//    val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY)
//      .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
//      .setAffinity(new RendezvousAffinityFunction(false, 2))
//      .setIndexedTypes(classOf[String], classOf[Row])
//
//    val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)

// FAILED ATTEMPT OF SETTING CONFIG : 2
//    val cacheConfiguration: CacheConfiguration[String, BinaryObject] = new CacheConfiguration[String, BinaryObject](KEY)
//      .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
//      .setAffinity(new RendezvousAffinityFunction(false, 2))
//      .setIndexedTypes(classOf[String], classOf[BinaryObject])
//
//    val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)

val sharedRDD = ic.fromCache[String, Row](KEY)
sharedRDD.saveValues(df.rdd)
}

private[sample] def get(sc: SparkContext, KEY: String) = {
  val ic = new IgniteContext(sc, CONFIG, false)
    //    val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY)
    //      .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
   //      .setAffinity(new RendezvousAffinityFunction(false, 2))
   //      .setIndexedTypes(classOf[String], classOf[Row])
   //
   //    val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)
 ic.fromCache[String, Row](KEY)
}
}

Solution

  • I am able to resolve above issue using following way:

    Added following xml snippet in XML file under CacheConfiguration node:

    <property name="indexedTypes">
         <list>
             <value>java.lang.String</value>
             <value>org.apache.spark.sql.Row</value>
         </list>
    </property>
    

    I want to store dataframe of type DataFrame[Row] and this is not yet possible with ignite. However, I can save RDD[Row] and to save that you must save it in pair format. So I need to convert RDD[Row] to RDD[(String, Row)]. To represent that in CacheConfiguration, I have added IndexTypes as above.

    You also need to save schema of dataframe along with data so that you can convert it back into dataframe later.

    Here is the code to save/read DF:

    object SparkIgniteCache {
        private val CONFIG = "config/cache.xml"
        private val schemaCacheConfig = makeSchemaCacheConfig("schemas")
    
    
        private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){
            val ic = new IgniteContext(sc, CONFIG, false)
            val sharedRDD = ic.fromCache[String, Row](KEY)
    
            val rddSchemaCache = ic.ignite.getOrCreateCache(schemaCacheConfig)
    
            rddSchemaCache.put(KEY+"_schema", df.schema)
    
           sharedRDD.saveValues(df.rdd)
        }
    
        private[sample] def get(sc: SparkContext, KEY: String)
                            : (StructType, IgniteRDD[String, Row]) = 
        {
            val ic = new IgniteContext(sc, CONFIG, false)
            val rddSchemaCache = ic.ignite.getOrCreateCache(schemaCacheConfig)
           (rddSchemaCache.get(KEY+"_schema"), ic.fromCache[String, Row](KEY))    
        }
    
        private def makeSchemaCacheConfig(name: String) =
           new CacheConfiguration[String, StructType](name)
                .setAtomicityMode(CacheAtomicityMode.ATOMIC)
                .setBackups(1)
                .setAffinity(new RendezvousAffinityFunction(false, 1))
    

    }

    In above code, I have also created dynamic CacheConfiguraiton to save Schema of dataframe of type SchemaType.

    Now, you just have to call set and get methods like below:

         // Set data/dataframe for KEY=input_data
         SparkIgniteCache.set(spark.sparkContext, df, "input_data")
    
         //Get dataframe
         val (schema, igniteRDD) = SparkIgniteCache.get(spark.sparkContext, "input_data")
         val rdd1: RDD[Row] = igniteRDD.map(_._2) //Getting Row from (String,Row)
         val df = spark.sqlContext.createDataFrame(rdd1, schema) 
    

    Thanks.