Search code examples
scalaapache-kafkakafka-consumer-apiproducer-consumer

How to Read data from external API in Kafka producer and send it to Kafka consumer in Scala


I am new to Apache Kafka and I want to read data inside Producer from https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&outputsize=full&apikey=demo API and then send it to topic and read this data from topic inside my consumer to save it to database.

I am not able to figure out how to send this data as it is in JSON format.

I have tried an example of Kafka consumer-producer with String values:

In my example, my Producer.scala is :

import java.util.Properties

import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.util.EntityUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import play.api.libs.json.{JsObject, JsValue, Json}
object Producer extends App {

  val url = "https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&outputsize=full&apikey=demo"
  val httpClient = HttpClientBuilder.create().build()
  val httpResponse = httpClient.execute(new HttpGet(url))
  val entity = httpResponse.getEntity
    val str = EntityUtils.toString(entity, "UTF-8")
    val content = Json.parse(str)
  val props:Properties = new Properties()
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer")
  props.put("acks","all")
  val producer = new KafkaProducer[Nothing, (String,JsValue)](props)
  val topic = "quick-start"
  try {
      val record = new ProducerRecord(topic, content.as[JsObject].fields(1))
      producer.send(record)
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    producer.close()
  }
}

and my Consumer.scala is :

import java.util.{Collections, Properties}
import java.util.regex.Pattern
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object Consumer extends App {

  val props:Properties = new Properties()
  props.put("group.id", "test")
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "1000")
  val consumer = new KafkaConsumer(props)
  val topics = List("quick-start")
  try {
    consumer.subscribe(topics.asJava)
    while (true) {
      val records = consumer.poll(10)
      for (record <- records.asScala) {
        println("Topic: " + record.topic() +
          ",Key: " + record.key() +
          ",Value: " + record.value() +
          ", Offset: " + record.offset() +
          ", Partition: " + record.partition())
      }
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    consumer.close()
  }
}

My built.sbt is :

name := "Kafka-AkkaPractice"

version := "0.1"

scalaVersion := "2.12.2"

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "2.1.0",
  "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime,
  "org.apache.httpcomponents" % "httpclient" % "4.5.2",
  "com.typesafe.play" %% "play-json" % "2.8.0"
)

What I understand is

props.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer")

is for String type of Key and Value.

Could anyone suggest how to send this type of data to my Kafka topic so that I can read it in from Consumer and save it to database?


Solution

  • You don't need to parse the JSON in the producer other than to validate that the API response can be parsed.

    If you want to send the data as-is, then you want val record = new ProducerRecord(topic, str)

    And in the consumer

     for (record <- records.asScala) {
        val content = Json.parse(record.value())