I am new to Apache Kafka and I want to read data inside Producer from https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&outputsize=full&apikey=demo API and then send it to topic and read this data from topic inside my consumer to save it to database.
I am not able to figure out how to send this data as it is in JSON format.
I have tried an example of Kafka consumer-producer with String values:
In my example, my Producer.scala is :
import java.util.Properties
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.util.EntityUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import play.api.libs.json.{JsObject, JsValue, Json}
object Producer extends App {
val url = "https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&outputsize=full&apikey=demo"
val httpClient = HttpClientBuilder.create().build()
val httpResponse = httpClient.execute(new HttpGet(url))
val entity = httpResponse.getEntity
val str = EntityUtils.toString(entity, "UTF-8")
val content = Json.parse(str)
val props:Properties = new Properties()
props.put("bootstrap.servers","localhost:9092")
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("acks","all")
val producer = new KafkaProducer[Nothing, (String,JsValue)](props)
val topic = "quick-start"
try {
val record = new ProducerRecord(topic, content.as[JsObject].fields(1))
producer.send(record)
}catch{
case e:Exception => e.printStackTrace()
}finally {
producer.close()
}
}
and my Consumer.scala is :
import java.util.{Collections, Properties}
import java.util.regex.Pattern
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object Consumer extends App {
val props:Properties = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers","localhost:9092")
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
val consumer = new KafkaConsumer(props)
val topics = List("quick-start")
try {
consumer.subscribe(topics.asJava)
while (true) {
val records = consumer.poll(10)
for (record <- records.asScala) {
println("Topic: " + record.topic() +
",Key: " + record.key() +
",Value: " + record.value() +
", Offset: " + record.offset() +
", Partition: " + record.partition())
}
}
}catch{
case e:Exception => e.printStackTrace()
}finally {
consumer.close()
}
}
My built.sbt is :
name := "Kafka-AkkaPractice"
version := "0.1"
scalaVersion := "2.12.2"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "2.1.0",
"ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime,
"org.apache.httpcomponents" % "httpclient" % "4.5.2",
"com.typesafe.play" %% "play-json" % "2.8.0"
)
What I understand is
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
is for String type of Key and Value.
Could anyone suggest how to send this type of data to my Kafka topic so that I can read it in from Consumer and save it to database?
You don't need to parse the JSON in the producer other than to validate that the API response can be parsed.
If you want to send the data as-is, then you want val record = new ProducerRecord(topic, str)
And in the consumer
for (record <- records.asScala) {
val content = Json.parse(record.value())