apache-spark - Spark-Elasticsearch 连接问题
问题描述
我有一个代码片段应该将数据索引到 Elasticsearch,
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format('es').option('es.nodes', '3.45.67.131').option('es.nodes.wan.only','true').option('es.port', 9200).option('es.resource', '%s/%s' % ('index_name', 'doc_type_name')).save()
但是,当我将其作为 spark 提交作业执行时,使用
spark-submit --packages org.elasticsearch:elasticsearch-hadoop:7.2.0 test-chetan.py
我收到一个错误:
Traceback (most recent call last):
File "/mnt/tmp/test-chetan.py", line 5, in <module>
df.write.format('es').option('es.nodes', '3.45.67.131').option('es.nodes.wan.only','true').option('es.resource', '%s/%s' % ('index_name', 'doc_type_name')).save()
File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 732, in save
File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [HEAD] on [index_name] failed; server[3.15.27.191:9200] returned [503|Service Unavailable:]
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:469)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:439)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:529)
at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.isEmpty(RestRepository.java:466)
at org.elasticsearch.spark.sql.ElasticsearchRelation.isEmpty(DefaultSource.scala:625)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:110)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
19/07/19 21:27:51 INFO SparkContext: Invoking stop() from shutdown hook
19/07/19 21:27:51 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-40-1.us-east-2.compute.internal:4041
19/07/19 21:27:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/07/19 21:27:51 INFO MemoryStore: MemoryStore cleared
19/07/19 21:27:51 INFO BlockManager: BlockManager stopped
19/07/19 21:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
19/07/19 21:27:51 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/07/19 21:27:51 INFO SparkContext: Successfully stopped SparkContext
19/07/19 21:27:51 INFO ShutdownHookManager: Shutdown hook called
19/07/19 21:27:51 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-d23384c9-63c3-4875-a254-d403226cccdd/pyspark-5bc00e36-b585-4c18-96c2-59aa20848db2
19/07/19 21:27:51 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-d23384c9-63c3-4875-a254-d403226cccdd
19/07/19 21:27:51 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-605e10a6-0232-4f55-855d-a04ef83fa886
我不太能调试这条线的原因,
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [HEAD] on [index_name] failed; server[3.15.27.191:9200] returned [503|Service Unavailable:]
我的 AWS Elasticsearch 可以公开访问,部署 spark 的 EMR 可以访问所有出口,所以我不认为这是一个安全问题。
有什么建议么?
解决方案
尝试在“es.nodes”配置下指定 ES 集群的所有节点。
当您将“es.nodes.wan.only”设置为“true”时,您将阻止连接器访问“es.nodes”中未指定的节点
或者你可以设置 'es.nodes.wan.only'=false
es.nodes.wan.only(默认为假)-
连接器是否用于 WAN 上云/受限环境中的 Elasticsearch 实例,例如 Amazon Web Services。在这种模式下,连接器禁用发现,并且仅在所有操作(包括读取和写入)期间通过声明的 es.nodes 进行连接。请注意,在这种模式下,性能会受到很大影响。”
https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html
推荐阅读
- jquery - 如何使用 AJAX 访问 laravel 雄辩的关系数据?
- java - 来自/到 Apache Kafka 的 Apache Camel 路由中断异常
- mysql - MySQL Workbench中两行之间的差异,但未授权LAG
- jdbc - JDBC JMeter,参数化一个表
- javascript - 未捕获的类型错误:firebase.auth(...).setPersistence(...).signInWithPopup 不是函数
- javascript - 如何在不重新加载页面的情况下使用 angularJS 更改浏览器地址栏中的 URL
- python - AttributeError:“列表”对象在 Spyder 中没有属性“值”
- php - 查找空白数组并取消设置/删除对象
- python - 将推文翻译成英文
- docker - Centos docker 代理端口 - 防火墙