首页 > 解决方案 > RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')

问题描述

我正在使用 pyspark 和 elasticsearch(py 库),并且在更新 ES 中的一个文档时出现以下错误。

2021-09-08 06:31:49 ERROR JobScheduler:91 - Error running job streaming job 1631082700000 ms.1
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
    func = lambda t, rdd: old_func(rdd)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
    posttoES(row)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
    es.update(index="anonprofile", id = jsonid, body=query)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", line 1903, in update
    "POST", path, params=params, headers=headers, body=body
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 458, in perform_request
    raise e
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", line 331, in _raise_error
    status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')

        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 172, in <module>
    ssc.awaitTermination() 
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
    func = lambda t, rdd: old_func(rdd)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
    posttoES(row)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
    es.update(index="anonprofile", id = jsonid, body=query)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", line 1903, in update
    "POST", path, params=params, headers=headers, body=body
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 458, in perform_request
    raise e
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", line 331, in _raise_error
    status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')

        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

代码片段是 -

row = json.loads(row)
count = row["Count"]
row.pop("Count")
jsonid = hashlib.sha224(json.dumps(row).encode('ascii', 'ignore')).hexdigest()

if es.exists(index="anonprofile", id=jsonid):
        q = {
        "script": {
            "source": "ctx._source.Count+={}".format(count),
            "lang": "painless"
            }
        }
        es.update(index="anonprofile", id = jsonid, body=q)
        
else:
        row["Count"] = count
        es.index(index="anonprofile", id=jsonid, body={'doc' : row})

第一个文档进入else块并按预期工作,但进入 if 块它在更新调用时返回错误。

在互联网上查找,我尝试更改查询,但似乎没有任何效果。

PS 出于学习目的,我试图在不集成 pyspark 的情况下执行相同的任务,这似乎是工作文件。代码在这里

标签: elasticsearchpyspark

解决方案


问题是您在doc转换为属性的字段中插入信息,因为row变量是值的字典,并且您尝试更新_source.Count而不是_source.doc.Count

body带有字段的 argdoc仅在文档​​不存在时update对例如 anupsert或 a有用。script

例如:_

row["Count"] = count
body = {
    "script": {
        "source": "ctx._source.Count+={}".format(count),
        "lang": "painless"
    }
    "upsert": row
}
es.update(index="anonprofile", id=jsonid, body=body)

而不是你的if exists...


推荐阅读