scala - EsHadoopIllegalArgumentException: Cannot detect ES version Spark-ElasticSearch example
问题描述
I am trying to run simple data write to ElasticSearch example. However, I keep getting this error:
EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only
My dependencies for Spark and ElasticSearch:
scalaVersion := "2.11.5"
val sparkVersion = "2.3.0"
resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"com.typesafe" % "config" % "1.3.0",
"org.elasticsearch" %% "elasticsearch-spark-20" % "6.2.4"
)
Here is my code for an example:
object App {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setMaster(args(0))
.setAppName("KafkaSparkStreaming")
sparkConf.set("es.index.auto.create", "true")
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
val sparkContext = streamingContext.sparkContext
sparkContext.setLogLevel("ERROR")
val sqlContext = new SQLContext(sparkContext)
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sparkContext.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")
streamingContext.start()
streamingContext.awaitTermination()
}
}
I run ElasticSearch with docker image. It is my docker-compose.yml file:
version: '3.3'
services:
kafka:
image: spotify/kafka
ports:
- "9092:9092"
environment:
- ADVERTISED_HOST=localhost
elasticsearch:
image: elasticsearch
kibana:
image: kibana
ports:
- "5601:5601"
What might cause this exception? I would really appreciate some help.
解决方案
我在尝试使用 elasticsearch 试验 spark 时遇到了类似的情况,将“elasticsearch-spark”依赖项替换为“elasticsearch-hadoop”以满足我的 elasticsearch 版本。解决了问题
val conf = new SparkConf().setAppName("Sample").setMaster("local[*]")
conf.set("es.index.auto.create", "true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd)
ssc.queueStream(microbatches).saveToEs("spark/docs")
ssc.start()
ssc.awaitTermination()
依赖列表
"org.apache.spark" %% "spark-core" % "2.2.0",
"org.apache.spark" %% "spark-sql" % "2.2.0",
"org.apache.spark" %% "spark-streaming" % "2.2.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.1",
"org.elasticsearch" %% "elasticsearch-hadoop" % "6.3.0",
推荐阅读
- c# - 播放列表未播放
- apache-kafka - 当只有 1 个代理可以访问时,是否可以生成 kafka 主题?
- selenium-webdriver - 如果类属于不同的测试,如何使用TestNG按顺序执行2个测试类?
- java - JavaFX 中的寻路
- javascript - 如果已知对象的其他属性,如何返回对象的属性?
- python - Python itertools.groupby() 使用具有多个键的元组
- c# - 创建没有构造函数注入或 ServiceLocator 的具有依赖关系的类的实例
- javascript - 如何判断页面加载时检查了哪些复选框?
- asp.net-core - 在 .net core 中设置空间数据
- reactjs - 如何在反应中使用减速器发送模拟数据?