Search code examples
spark-streamingrddignite

Ignite cache is empty after save?


My data pipeline is following: Kafka => perform some calculations => load resulting pairs into Ignite cache => print it out

 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("MainApplication");
 JavaSparkContext sc = new JavaSparkContext(conf);
 JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(10));
 JavaIgniteContext<String, Float> igniteContext = new JavaIgniteContext<>(sc, PATH, false);

 JavaDStream<Message> dStream = KafkaUtils.createDirectStream(
         streamingContext,
         LocationStrategies.PreferConsistent(),
         ConsumerStrategies.<String, Message>
                 Subscribe(Collections.singletonList(TOPIC), kafkaParams)
 )
         .map(ConsumerRecord::value);

 JavaPairDStream<String, Message> pairDStream =
         dStream.mapToPair(message -> new Tuple2<>(message.getName(), message));

 JavaPairDStream<String, Float> pairs = pairDStream
         .combineByKey(new CreateCombiner(), new MergeValue(), new MergeCombiners(), new HashPartitioner(10))
         .mapToPair(new ToPairTransformer());

 JavaIgniteRDD<String, Float> myCache = igniteContext.fromCache(new CacheConfiguration<>());

  // I know that we put something here:
  pairDStream.foreachRDD((VoidFunction<JavaPairRDD<String, Float>>) myCache::savePairs);

  // But I can't see anything here:
  myCache.foreach(tuple2 -> System.out.println("In cache: " + tuple2._1() + " = " + tuple2._2()));

  streamingContext.start();
  streamingContext.awaitTermination();
  streamingContext.stop();
  sc.stop();

But this code prints nothing.. Why?

Why Ignite cache is empty even after savePairs?

What can be wrong here?

Thanks in advance!


Solution

  • For me, it looks like that pairDStream.foreachRDD(...) is a lazy operation and has no any affect at least before you start streaming context streamingContext.start(). On the other hand, myCache.foreach(...) is eager operation and you perform it on actually empty cache. So, try to put myCache.foreach(...) after streaming context start. Or even after termination.