My data pipeline is following: Kafka
=> perform some calculations => load resulting pairs into Ignite cache
=> print it out
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("MainApplication");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(10));
JavaIgniteContext<String, Float> igniteContext = new JavaIgniteContext<>(sc, PATH, false);
JavaDStream<Message> dStream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Message>
Subscribe(Collections.singletonList(TOPIC), kafkaParams)
)
.map(ConsumerRecord::value);
JavaPairDStream<String, Message> pairDStream =
dStream.mapToPair(message -> new Tuple2<>(message.getName(), message));
JavaPairDStream<String, Float> pairs = pairDStream
.combineByKey(new CreateCombiner(), new MergeValue(), new MergeCombiners(), new HashPartitioner(10))
.mapToPair(new ToPairTransformer());
JavaIgniteRDD<String, Float> myCache = igniteContext.fromCache(new CacheConfiguration<>());
// I know that we put something here:
pairDStream.foreachRDD((VoidFunction<JavaPairRDD<String, Float>>) myCache::savePairs);
// But I can't see anything here:
myCache.foreach(tuple2 -> System.out.println("In cache: " + tuple2._1() + " = " + tuple2._2()));
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.stop();
sc.stop();
But this code prints nothing.. Why?
Why Ignite cache
is empty even after savePairs
?
What can be wrong here?
Thanks in advance!
For me, it looks like that pairDStream.foreachRDD(...)
is a lazy operation and has no any affect at least before you start streaming context streamingContext.start()
.
On the other hand, myCache.foreach(...)
is eager operation and you perform it on actually empty cache.
So, try to put myCache.foreach(...)
after streaming context start. Or even after termination.