We are getting records from Kafka and we are fetching Cardnumber from Kafka in Spark streaming and performing the Kafka cardnumber comparison from Memsql records and selecting the count and cardnumber by grouping cardnumber. But the count is not coming in proper way in Spark Streaming
For example Count in Memsql when we are executing the query it gives the below output in memsql command prompt
memsql> select card_number,count(*) from cardnumberalert5 where
inserted_time <= now() and inserted_time >= NOW() - INTERVAL 10 MINUTE group
by card_number;
+------------------+----------+
| card_number | count(*) |
+------------------+----------+
| 4556655960290527 | 2 |
| 6011255715328120 | 4 |
| 4532133676538232 | 2 |
| 6011614607071620 | 2 |
| 4024007117099605 | 2 |
| 347138718258304 | 4 |
+------------------+----------+
We notice in Spark Streaming the count is getting distributed
For example Memsql Output when we execute from memsql command prompt
+------------------+----------+
| card_number | count(*) |
+------------------+----------+
| 4556655960290527 | 2 |
When the same sql is performed in Spark Streaming it prints the output as
RECORDS FOUNDS ****************************************
CARDNUMBER KAFKA ############### 4024007117099605
CARDNUMBER MEMSQL ############### 4556655960290527
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 4556655960290527
COUNT MEMSQL ############### 1
Here the count show be 2 but we are getting 2 Records of the cardnumber with count 1
Printing the Output in Spark Streaming
RECORDS FOUNDS ****************************************
CARDNUMBER KAFKA ############### 4024007117099605
CARDNUMBER MEMSQL ############### 4556655960290527
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 6011255715328120
COUNT MEMSQL ############### 2
CARDNUMBER MEMSQL ############### 4532133676538232
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 6011614607071620
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 4024007117099605
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 347138718258304
COUNT MEMSQL ############### 2
CARDNUMBER MEMSQL ############### 4556655960290527
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 6011255715328120
COUNT MEMSQL ############### 2
CARDNUMBER MEMSQL ############### 4532133676538232
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 6011614607071620
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 4024007117099605
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 347138718258304
COUNT MEMSQL ############### 2
Spark Streaming Program
class SparkKafkaConsumer11(val ssc : StreamingContext,val sc : SparkContext,val spark : org.apache.spark.sql.SparkSession, val topics : Array[String], val kafkaParam : scala.collection.immutable.Map[String,Object]) {
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParam)
)
val recordStream = stream.map(record => (record.value)) // Take the value only from the key,value pair for processing
recordStream.foreachRDD{rdd =>
val brokers = "174.24.154.244:9092" // Specify the BROKER
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.CLIENT_ID_CONFIG,"SparkKafkaConsumer__11")
val producer = new KafkaProducer[String,String](props)
val result = spark.read
.format("com.memsql.spark.connector")
.options(Map("query" -> ("select card_number,count(*) from cardnumberalert5 where inserted_time <= now() and inserted_time >= NOW() - INTERVAL 10 MINUTE group by card_number"),"database" -> "fraud"))
.load()
val record = rdd.map(line => line.split("\\|")) //Split the record and create a array of it.
record.collect().foreach{recordRDD =>
val now1 = System.currentTimeMillis
val now = new java.sql.Timestamp(now1)
val cardnumber_kafka = recordRDD(13).toString
val sessionID = recordRDD(1).toString
println("RECORDS FOUNDS ****************************************")
println("CARDNUMBER KAFKA ############### "+cardnumber_kafka)
result.collect().foreach{t =>
val resm1 = t.getAs[String]("card_number")
println("CARDNUMBER MEMSQL ############### "+resm1)
val resm2 = t.getAs[Long]("count(*)")
println("COUNT MEMSQL ############### "+resm2)
if(resm1.equals(cardnumber_kafka)){
if(resm2 > 2){
println("INSIDE IF CONDITION FOR MORE THAN 3 COUNT"+now)
val messageToKafka = "---- THIRD OR MORE OCCURANCE ---- "+cardnumber_kafka
val message=new ProducerRecord[String, String]("output1",0,sessionID,messageToKafka)
try {
producer.send(message)
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
}
}
}
}
producer.close()
}
}
Not sure how to fix it, Any suggestion or Help is Highly Appreciated
Thanks in Advance
We were able to solve this issue by setting the following property in Spark Configuration.
Code:
.set("spark.memsql.disablePartitionPushdown","true")