Search code examples
javaapache-sparkignite

JavaRDD From IgniteRDD taking long time


I have created an Apache Ignite application with Spark

  • Ignite Version - 1.6.0
  • Spark Version - 1.5.2 (Built on Scala 2.11)

Application stores two tuples to IgniteRDD

When retrieve is called then collect function is taking more than 3 minutes.

Number of jobs submitted are more than 1000

Code snippet:

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.ignite.spark.IgniteContext;
import org.apache.ignite.spark.IgniteRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class CopyOfMainIgnite {

    public static void main(String args[]) {
        SparkConf conf = new SparkConf().setAppName("Demo").setMaster(
                "spark://169.254.228.183:7077");
        System.out.println("Spark conf initialized.");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.addJar("./target/IgnitePOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar");
        System.out.println("Spark context initialized.");
        IgniteContext ic = new IgniteContext(sc.sc(),
                "ignite/client-default-config.xml");
        System.out.println("Ignite Context initialized.");
        String cacheName = "demo6";
        save(sc, ic, cacheName);

        retrieve(ic, cacheName);
        ic.close(false);
        sc.close();

    }

    private static void retrieve(IgniteContext ic, String cacheName) {
        System.out.println("Getting IgniteRDD saved.");
        IgniteRDD<String, String> javaIRDDRet = ic.fromCache(cacheName);
        long temp1 = System.currentTimeMillis();

        JavaRDD<Tuple2<String, String>> javardd = javaIRDDRet.toJavaRDD();
        System.out
                .println("Is empty Start Time: " + System.currentTimeMillis());
        System.out.println("javaIRDDRet.isEmpty(): " + javardd.isEmpty());
        System.out.println("Is empty End Time: " + System.currentTimeMillis());
        long temp2 = System.currentTimeMillis();
        long temp3 = System.currentTimeMillis();
        System.out.println("collect and println Start Time: "
                + System.currentTimeMillis());
        javardd.collect().forEach(System.out::println);
        System.out.println("collect and println End Time: "
                + System.currentTimeMillis());
        long temp4 = System.currentTimeMillis();
        System.out.println("Is empty : " + temp1 + " " + temp2
                + " Collect and print: " + temp3 + " " + temp4);
    }

    private static void save(JavaSparkContext sc, IgniteContext ic,
            String cacheName) {
        IgniteRDD<String, String> igniteRDD = ic.fromCache(cacheName);
        System.out.println("IgniteRDD from cache initialized.");
        Map<String, String> tempMap = new HashMap<String, String>();
        tempMap.put("Aditya", "Jain");
        tempMap.put("Pranjal", "Jaju");
        Tuple2<String, String> tempTuple1 = new Tuple2<String, String>(
                "Aditya", "Jain");
        Tuple2<String, String> tempTuple2 = new Tuple2<String, String>(
                "Pranjal", "Jaju");
        List<Tuple2<String, String>> list = new LinkedList<Tuple2<String, String>>();
        list.add(tempTuple1);
        list.add(tempTuple2);
        JavaPairRDD<String, String> jpr = sc.parallelizePairs(list, 4);
        System.out.println("Random RDD saved.");
        igniteRDD.savePairs(jpr.rdd(), false);
        System.out.println("IgniteRDD saved.");
    }
}

So my question: is it going to take 3-4 minutes to fetch 2 Rdd tuples from Ignite and collect them in my process?

Or is my expectation wrong that it will respond in milliseconds?

After debugging I found it is creating 1024 partitions in ignite rdd which is causing it to fire 1024 jobs. And i am not getting any way to control number of partitions.


Solution

  • You can decrease number of partitions in CacheConfiguration:

    <bean class="org.apache.ignite.configuration.CacheConfiguration">
        <property name="affinity">
            <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
                <property name="partitions" value="32"/>
            </bean>
        </property>
    </bean>
    

    Also you can use IgniteRDD.sql(..) and IgniteRDD.objectSql(..) methods to retrieve data directly from Ignite utilizing fast indexed search. For details on how to configure SQL in Ignite refer to this page: https://apacheignite.readme.io/docs/sql-queries