While processing a stream of Avro messages through Kafka and Spark, I am saving the processed data as documents in a ElasticSearch index. Here's the code (simplified):
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
MyType t = deserialize(encodedAvroData);
// Creating the ElasticSearch Transport client
Settings settings = Settings.builder()
.put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
IndexRequest indexRequest = new IndexRequest("index", "item", id)
.source(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
.doc(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
client.close();
Everything works as expected; the only problem is performance: saving to ES requires some time, and I suppose that this is due to the fact that I open/close an ES Transport client for each RDD. Spark documentation suggests that this approach is quite correct: as soon as I understand, the only possible optimisation is using rdd.foreachPartition, but I only have one partition, so I am not sure that this would be beneficial. Any other solution to achieve better performance?
Because you create new connect whenever process a record of RDD.
So, I think use foreachPartition
will make better performance regardless of only one partition, because it help you bring your ES connection instance outside, reuse it in the loop.