Search code examples
scalaapache-kafkaapache-kafka-streams

How to modify KStream key and values in Kafka word count program?


I am new to Kafka Streams and kind of stuck in basic word count program. In the below program, I am trying to change the case of value but it's not working (val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase)). Is there anything wrong in here?

kafka stream version => 2.3.0

Scala version => 2.11.8

import java.util._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.streams.{KafkaStreams,StreamsBuilder, StreamsConfig}
import org.apache.kafka.common.serialization.{StringDeserializer,LongDeserializer}

object WordCount {
  def main(args: Array[String]): Unit = {

    val config = new Properties()

    config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
    config.put(ConsumerConfig.AutoOffsetReset,"earliest")
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringDeserializer])
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringDeserializer])



    val builder = new StreamsBuilder
    val wordCountInput = builder.stream[String,String]("streams-plaintext-input")

    val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase)

    wordCountInputProcessed.to("streams-plaintext-output")

    val streams = new KafkaStreams(builder.build(),config)
    streams.start()
    println(streams.toString)

  }
}

Here's the snapshot of this issue.

enter image description here

Shouldn't it be String instead of Nothing ?

enter image description here


Solution

  • I changed to Kafka streams DSL for scala APIs from Java and it solved the problem. I am also using following modules for respective reasons.

    org.apache.kafka.streams.scala.ImplicitConversions: Module that brings into scope the implicit conversions between the Scala and Java classes.

    org.apache.kafka.streams.scala.Serdes: Module that contains all primitive SerDes that can be imported as implicits and a helper to create custom SerDes.

    Please refer to this documentation for more details (Topic: KAFKA STREAMS DSL FOR SCALA) => https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#scala-dsl

    import java.time.Duration
    import java.util._
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.serialization.Serdes
    import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
    import org.apache.kafka.streams.scala.StreamsBuilder
    
    // Import for Scala DSL
    import org.apache.kafka.streams.scala.ImplicitConversions._
    import org.apache.kafka.streams.scala.Serdes._
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
    
        val config = new Properties()
    
        config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[Serdes.StringSerde])
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[Serdes.LongSerde])
    
        val builder = new StreamsBuilder
        val wordCountInput = builder.stream[String,String]("streams-plaintext-input")
    
        val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase())
            .flatMapValues(x=>x.split(" "))
            .selectKey((key,value) => value)
            .groupByKey
            .count
    
        wordCountInputProcessed.toStream.to("streams-plaintext-output")
    
        val streams = new KafkaStreams(builder.build(),config)
        streams.start()
        println(streams.toString)
    
        sys.ShutdownHookThread {
          streams.close(Duration.ofSeconds(10))
        }
    
      }
    }