Search code examples
apache-sparkelasticsearchspark-structured-streaming

Elasticsearch support for spark 2.4.2 with scala 2.12


I'm not able to find any ES 6.7.1 supporting jar for spark 2.4.2 with scala 2.12 In maven repo only scala 2.11 and 2.10 is supported for the jar.

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.7.1</version>
</dependency>

For my application we are using spark 2.4.2 which supports only scala 2.12 version. Following is the error shown when I try to run with "elasticsearch-spark-20_2.11" jar

StreamingExecutionRelation KafkaV2[Subscribe[test_topic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:302)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
        at org.elasticsearch.spark.sql.DataFrameValueWriter.writeStruct(DataFrameValueWriter.scala:78)
        at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:70)
        at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:53)
        at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:53)
        at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71)
        at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58)
        at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)
        at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
        at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)
        at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
        at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
        at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Solution

  • Sorry for the delay,

    We can't use Elasticsearch Spark / Elasticsearch Hadoop libraries yet with Scala 2.12. There exists an opened merge request (https://github.com/elastic/elasticsearch-hadoop/pull/1308) which is pending, waiting for tests to pass.

    You either have to downgrade your project to use Scala 2.11 or have to wait for the libraries to be released on Scala 2.12