elasticsearch - RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')
问题描述
我正在使用 pyspark 和 elasticsearch(py 库),并且在更新 ES 中的一个文档时出现以下错误。
2021-09-08 06:31:49 ERROR JobScheduler:91 - Error running job streaming job 1631082700000 ms.1
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
func = lambda t, rdd: old_func(rdd)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
posttoES(row)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
es.update(index="anonprofile", id = jsonid, body=query)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", line 168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", line 1903, in update
"POST", path, params=params, headers=headers, body=body
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 458, in perform_request
raise e
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
timeout=timeout,
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
self._raise_error(response.status, raw_data)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", line 331, in _raise_error
status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 172, in <module>
ssc.awaitTermination()
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
func = lambda t, rdd: old_func(rdd)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
posttoES(row)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
es.update(index="anonprofile", id = jsonid, body=query)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", line 168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", line 1903, in update
"POST", path, params=params, headers=headers, body=body
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 458, in perform_request
raise e
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
timeout=timeout,
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
self._raise_error(response.status, raw_data)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", line 331, in _raise_error
status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
代码片段是 -
row = json.loads(row)
count = row["Count"]
row.pop("Count")
jsonid = hashlib.sha224(json.dumps(row).encode('ascii', 'ignore')).hexdigest()
if es.exists(index="anonprofile", id=jsonid):
q = {
"script": {
"source": "ctx._source.Count+={}".format(count),
"lang": "painless"
}
}
es.update(index="anonprofile", id = jsonid, body=q)
else:
row["Count"] = count
es.index(index="anonprofile", id=jsonid, body={'doc' : row})
第一个文档进入else
块并按预期工作,但进入 if 块它在更新调用时返回错误。
在互联网上查找,我尝试更改查询,但似乎没有任何效果。
PS 出于学习目的,我试图在不集成 pyspark 的情况下执行相同的任务,这似乎是工作文件。代码在这里
解决方案
问题是您在doc
转换为属性的字段中插入信息,因为row
变量是值的字典,并且您尝试更新_source.Count
而不是_source.doc.Count
body
带有字段的 argdoc
仅在文档不存在时update
对例如 anupsert
或 a有用。script
例如:_
row["Count"] = count
body = {
"script": {
"source": "ctx._source.Count+={}".format(count),
"lang": "painless"
}
"upsert": row
}
es.update(index="anonprofile", id=jsonid, body=body)
而不是你的if exists...
推荐阅读
- amazon-web-services - Jenkinsfile 未检测到 aws 插件语法
- javascript - 一个索引依赖于 Java Script 中的另一个索引
- java - Raspberry PI 和 windows pc 的代码之间的差异
- android-build - 如何在 android soong 构建系统中使用 objcopy --strip-symbols
- macos - Zsh | `酿造清单 | {any command}` 引发未捕获信号 PIPE 错误
- r - 使用接收器功能时更改和乱码的消息
- api - Rest api url 设计异常
- android - 使用 Intent ActivityResult 的 Google 登录始终带有 RESULT_CANCELED
- sql - WHERE 字符串 IN 字符串 SQL Server
- systemd - 执行su命令时如何防止systemd将进程移动到其他cgroup