Search code examples
scalaapache-sparkapache-kafkaspark-streamingsinglestore

Spark Streaming from Kafka and comparison with records of Memsql (count is not coming proper)


We are getting records from Kafka and we are fetching Cardnumber from Kafka in Spark streaming and performing the Kafka cardnumber comparison from Memsql records and selecting the count and cardnumber by grouping cardnumber. But the count is not coming in proper way in Spark Streaming

For example Count in Memsql when we are executing the query it gives the below output in memsql command prompt

memsql> select card_number,count(*) from cardnumberalert5 where 
inserted_time <= now() and inserted_time >= NOW() - INTERVAL 10 MINUTE group 
by card_number;
+------------------+----------+
| card_number      | count(*) |
+------------------+----------+
| 4556655960290527 |        2 |
| 6011255715328120 |        4 |
| 4532133676538232 |        2 |
| 6011614607071620 |        2 |
| 4024007117099605 |        2 |
| 347138718258304  |        4 |
+------------------+----------+

We notice in Spark Streaming the count is getting distributed

For example Memsql Output when we execute from memsql command prompt

+------------------+----------+
| card_number      | count(*) |
+------------------+----------+
| 4556655960290527 |        2 |

When the same sql is performed in Spark Streaming it prints the output as

RECORDS FOUNDS ****************************************
CARDNUMBER KAFKA ############### 4024007117099605
CARDNUMBER MEMSQL ############### 4556655960290527
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 4556655960290527
COUNT MEMSQL ############### 1

Here the count show be 2 but we are getting 2 Records of the cardnumber with count 1

Printing the Output in Spark Streaming

RECORDS FOUNDS ****************************************
CARDNUMBER KAFKA ############### 4024007117099605
CARDNUMBER MEMSQL ############### 4556655960290527
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 6011255715328120
COUNT MEMSQL ############### 2
CARDNUMBER MEMSQL ############### 4532133676538232
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 6011614607071620
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 4024007117099605
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 347138718258304
COUNT MEMSQL ############### 2
CARDNUMBER MEMSQL ############### 4556655960290527
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 6011255715328120
COUNT MEMSQL ############### 2
CARDNUMBER MEMSQL ############### 4532133676538232
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 6011614607071620
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 4024007117099605
COUNT MEMSQL ############### 1
CARDNUMBER MEMSQL ############### 347138718258304
COUNT MEMSQL ############### 2

Spark Streaming Program

class SparkKafkaConsumer11(val ssc : StreamingContext,val sc : SparkContext,val spark : org.apache.spark.sql.SparkSession, val topics : Array[String], val kafkaParam : scala.collection.immutable.Map[String,Object]) {

 val stream = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParam)
          )

  val recordStream = stream.map(record => (record.value)) // Take the value only from the key,value pair for processing

   recordStream.foreachRDD{rdd =>

val brokers = "174.24.154.244:9092" // Specify the BROKER
val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.CLIENT_ID_CONFIG,"SparkKafkaConsumer__11")
val producer = new KafkaProducer[String,String](props)

val result = spark.read
            .format("com.memsql.spark.connector")
            .options(Map("query" -> ("select card_number,count(*) from cardnumberalert5 where inserted_time <= now() and inserted_time >= NOW() - INTERVAL 10 MINUTE group by card_number"),"database" -> "fraud"))
            .load()

val record = rdd.map(line => line.split("\\|")) //Split the record and create a array of it.

 record.collect().foreach{recordRDD =>
    val now1 = System.currentTimeMillis

    val now = new java.sql.Timestamp(now1)
    val cardnumber_kafka = recordRDD(13).toString
    val sessionID = recordRDD(1).toString
    println("RECORDS FOUNDS ****************************************")
    println("CARDNUMBER KAFKA ############### "+cardnumber_kafka)

    result.collect().foreach{t => 

      val resm1 = t.getAs[String]("card_number")
      println("CARDNUMBER MEMSQL ############### "+resm1)
      val resm2 = t.getAs[Long]("count(*)")
      println("COUNT MEMSQL ############### "+resm2)

      if(resm1.equals(cardnumber_kafka)){
        if(resm2 > 2){
          println("INSIDE IF CONDITION FOR MORE THAN 3 COUNT"+now)
          val messageToKafka = "---- THIRD OR MORE OCCURANCE  ---- "+cardnumber_kafka
          val message=new ProducerRecord[String, String]("output1",0,sessionID,messageToKafka)
          try {
            producer.send(message)

          } catch {
              case e: Exception =>
              e.printStackTrace
              System.exit(1)
          }
        }
      }

    }

}


producer.close()

}

}

Not sure how to fix it, Any suggestion or Help is Highly Appreciated

Thanks in Advance


Solution

  • We were able to solve this issue by setting the following property in Spark Configuration.

    Code:

     .set("spark.memsql.disablePartitionPushdown","true")