Search code examples
apache-kafkaspark-streamingelasticsearch-java-apispark-avro

Saving data to ElasticSearch in Spark task


While processing a stream of Avro messages through Kafka and Spark, I am saving the processed data as documents in a ElasticSearch index. Here's the code (simplified):

    directKafkaStream.foreachRDD(rdd ->{

        rdd.foreach(avroRecord -> {
            byte[] encodedAvroData = avroRecord._2;
            MyType t = deserialize(encodedAvroData);

    // Creating the ElasticSearch Transport client
    Settings settings = Settings.builder()
            .put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
    TransportClient client = new PreBuiltTransportClient(settings)
            .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

    IndexRequest indexRequest = new IndexRequest("index", "item", id)
            .source(jsonBuilder()
                    .startObject()
                    .field("name", name)
                    .field("timestamp", new Timestamp(System.currentTimeMillis()))
                    .endObject());

    UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
            .doc(jsonBuilder()
                    .startObject()
                    .field("name", name)
                    .field("timestamp", new Timestamp(System.currentTimeMillis()))
                    .endObject())
            .upsert(indexRequest);

    client.update(updateRequest).get();

    client.close();

Everything works as expected; the only problem is performance: saving to ES requires some time, and I suppose that this is due to the fact that I open/close an ES Transport client for each RDD. Spark documentation suggests that this approach is quite correct: as soon as I understand, the only possible optimisation is using rdd.foreachPartition, but I only have one partition, so I am not sure that this would be beneficial. Any other solution to achieve better performance?


Solution

  • Because you create new connect whenever process a record of RDD. So, I think use foreachPartition will make better performance regardless of only one partition, because it help you bring your ES connection instance outside, reuse it in the loop.