Search code examples
apache-sparkapache-kafkaavro

Kafka Avro to Elasticsearch with Spark


Want to put Avro messages from Kafka topics into Elasticsearch using Spark job (and SchemaRegistry with many defined schemas). I was able to read and deserialize records into Strings (json) format succesfully (with those 2 methods):

   // Deserialize Avro to String
  def avroToJsonString(record: GenericRecord): String = try {
    val baos = new ByteArrayOutputStream
    try {
      val schema = record.getSchema
      val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos, false)
      val avroWriter = new SpecificDatumWriter[GenericRecord](schema)
      avroWriter.write(record, jsonEncoder)
      jsonEncoder.flush()
      baos.flush()
      new String(baos.toByteArray)
    } catch {
      case ex: IOException =>
        throw new IllegalStateException(ex)
    } finally if (baos != null) baos.close()
  }

  // Parse JSON String
  val parseJsonStream = (inStream: String) => {
      try {
        val parsed = Json.parse(inStream)
        Option(parsed)
      } catch {
        case e: Exception => System.err.println("Exception while parsing JSON: " + inStream)
          e.printStackTrace()
          None
      }
    }

I'm reading record by record and I see deserialized JSON strings in debugger, everything looks fine, but for some reason I couldn't save them into Elasticsearch, because I guess RDD is needed to call saveToEs method. This is how I read avro records from Kafka:

val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_TOPICS, kafkaParams))

      val kafkaStreamParsed= kafkaStream.foreachRDD(rdd => {
        rdd.foreach( x => {
          val jsonString: String = avroToJsonString(x.value()) 
          parseJsonStream(jsonString) 
          })
        })

In case when I was reading json (not Avro) records, I was able to do it with:

EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)

I have an error in saveToEs method saying

Cannot resolve overloaded method 'saveToEs'

Tried to make rdd with sc.makeRDD() but had no luck either. How should I put all these records from batch job into RDD and afterward to Elasticsearch or I'm doing it all wrong?

UPDATE

Tried with solution:

val messages: DStream[Unit] = kafkaStream
        .map(record => record.value)
        .flatMap(record => {
          val record1 = avroToJsonString(record)
          JSON.parseFull(record1).map(rawMap => {
            val map = rawMap.asInstanceOf[Map[String,String]]
          })
        })

again with the same Error (cannot resolve overloaded method)

UPDATE2

val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
        val eventJSON = avroToJsonString(rdd.value())
        parseJsonStream(eventJSON)
      })

      try {
        EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)
      } catch {
        case e: Exception =>
          EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
          e.printStackTrace()
      }

Now I get the records in ES.

Using Spark 2.3.0 and Scala 2.11.8


Solution

  • I've managed to do it:

    val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_EVENT_TOPICS, kafkaParams))
    
          val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
            val eventJSON = avroToJsonString(rdd.value())
            parseJsonStream(eventJSON)
          })
    
          try {
            EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)
          } catch {
            case e: Exception =>
              EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
              e.printStackTrace()
          }