首页 > 解决方案 > Spark-Elasticsearch 连接问题

问题描述

我有一个代码片段应该将数据索引到 Elasticsearch,

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format('es').option('es.nodes', '3.45.67.131').option('es.nodes.wan.only','true').option('es.port', 9200).option('es.resource', '%s/%s' % ('index_name', 'doc_type_name')).save()

但是,当我将其作为 spark 提交作业执行时,使用

 spark-submit --packages org.elasticsearch:elasticsearch-hadoop:7.2.0 test-chetan.py

我收到一个错误:

Traceback (most recent call last):
  File "/mnt/tmp/test-chetan.py", line 5, in <module>
    df.write.format('es').option('es.nodes', '3.45.67.131').option('es.nodes.wan.only','true').option('es.resource', '%s/%s' % ('index_name', 'doc_type_name')).save()
  File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 732, in save
  File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [HEAD] on [index_name] failed; server[3.15.27.191:9200] returned [503|Service Unavailable:]
        at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:469)
        at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:439)
        at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:529)
        at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:524)
        at org.elasticsearch.hadoop.rest.RestRepository.isEmpty(RestRepository.java:466)
        at org.elasticsearch.spark.sql.ElasticsearchRelation.isEmpty(DefaultSource.scala:625)
        at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:110)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

19/07/19 21:27:51 INFO SparkContext: Invoking stop() from shutdown hook
19/07/19 21:27:51 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-40-1.us-east-2.compute.internal:4041
19/07/19 21:27:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/07/19 21:27:51 INFO MemoryStore: MemoryStore cleared
19/07/19 21:27:51 INFO BlockManager: BlockManager stopped
19/07/19 21:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
19/07/19 21:27:51 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/07/19 21:27:51 INFO SparkContext: Successfully stopped SparkContext
19/07/19 21:27:51 INFO ShutdownHookManager: Shutdown hook called
19/07/19 21:27:51 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-d23384c9-63c3-4875-a254-d403226cccdd/pyspark-5bc00e36-b585-4c18-96c2-59aa20848db2
19/07/19 21:27:51 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-d23384c9-63c3-4875-a254-d403226cccdd
19/07/19 21:27:51 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-605e10a6-0232-4f55-855d-a04ef83fa886

我不太能调试这条线的原因,

: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [HEAD] on [index_name] failed; server[3.15.27.191:9200] returned [503|Service Unavailable:]

我的 AWS Elasticsearch 可以公开访问,部署 spark 的 EMR 可以访问所有出口,所以我不认为这是一个安全问题。

有什么建议么?

标签: apache-sparkelasticsearchpysparkamazon-emr

解决方案


尝试在“es.nodes”配置下指定 ES 集群的所有节点。

当您将“es.nodes.wan.only”设置为“true”时,您将阻止连接器访问“es.nodes”中未指定的节点

或者你可以设置 'es.nodes.wan.only'=false

es.nodes.wan.only(默认为假)-

连接器是否用于 WAN 上云/受限环境中的 Elasticsearch 实例,例如 Amazon Web Services。在这种模式下,连接器禁用发现,并且仅在所有操作(包括读取和写入)期间通过声明的 es.nodes 进行连接。请注意,在这种模式下,性能会受到很大影响。”

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html


推荐阅读