首页 > 解决方案 > 使用 scala 2.12 对 spark 2.4.2 的 Elasticsearch 支持

问题描述

我找不到任何 ES 6.7.1 支持 jar 的 spark 2.4.2 和 scala 2.12 在 maven repo 中,jar 仅支持 scala 2.11 和 2.10。

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.7.1</version>
</dependency>

对于我的应用程序,我们使用的是 spark 2.4.2,它仅支持 scala 2.12 版本。以下是我尝试使用“elasticsearch-spark-20_2.11”jar 运行时显示的错误

StreamingExecutionRelation KafkaV2[Subscribe[test_topic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:302)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
        at org.elasticsearch.spark.sql.DataFrameValueWriter.writeStruct(DataFrameValueWriter.scala:78)
        at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:70)
        at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:53)
        at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:53)
        at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71)
        at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58)
        at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)
        at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
        at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)
        at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
        at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
        at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

标签: apache-sparkelasticsearchspark-structured-streaming

解决方案


抱歉耽搁了,

我们还不能在 Scala 2.12 中使用 Elasticsearch Spark / Elasticsearch Hadoop 库。存在一个打开的合并请求 ( https://github.com/elastic/elasticsearch-hadoop/pull/1308 ),它正在等待测试通过。

您要么必须降级项目才能使用 Scala 2.11,要么必须等待库在 Scala 2.12 上发布


推荐阅读