首页 > 解决方案 > 使用 python 进行弹性搜索批量更新,如何在数组字段中追加新数据

问题描述

如何在 python 中使用 bulkupdate 更新弹性搜索中的字段。我尝试了很多方法,但都出错了。在某些情况下,我收到文档丢失错误,我如何同时更新和 upsert。并且附加到字段也不起作用。elasticsearch==7.9.1 是我在 python 中使用的包

for i in range(0, length, steps):
    end_index = length-1 if i+steps>length else i+steps
    temp_list = test_data[i: end_index]
    bulk_file = ''
    actions = [{
        "_index": "test-data",
        "_opt_type":"update",
        "_type": "test-test-data",
        "_id": test_row ['testId'],
        "doc":{"script": {
                          "source": "ctx._source.DataIds.add(params.DataIds)",
                          "lang": "painless",
                          "params": {
                              "DataIds":test_row ['DataIds']
                          }
                      }}
        }
        for test_row in temp_list
    ]
    helpers.bulk(es, actions)

我得到的错误是这个

    {'update': {'_index': 'test-data', '_type': 'products', '_id': '333', 'status': 400, 'error': {'type': 'illegal_argument_exception', 'reason': 'failed
 to execute script', 'caused_by': {'type': 'script_exception', 'reason': 'runtime error', 'script_stack': ['ctx._source.dataIds.add(params.dataIds)', '
    ^---- HERE'], 'script': 'if (ctx._source.dataIds == null) { ctx._source.dataIds = []; } ctx._source.dataIds.add(params.dataIds)', 'lang': 'painless', 'position': {'offse
t': 105, 'start': 71, 'end': 118}, 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'dynamic method [java.lang.String, add/1] not found'}}}, 'data': {'upsert': {}, 'scripted_up
sert': True, 'script': {'source': 'if (ctx._source.dataIds == null) { ctx._source.dataIds = []; } ctx._source.dataIds.add(params.dataIds)', 'lang': 'painless', 'params': {'c
dataIds': 'set123'}}}}}])

标签: pythonpython-3.xelasticsearchelasticsearch-5

解决方案


upsert通过脚本的正确方法是没有doc但只有该script部分。upsert如果您想在同一命令中进行更新插入和更新,您还需要该部分。它是这样的:

actions = [{
    "_op_type":"update",
    "_index": "test-data",
    "_type": "test-test-data",
    "_id": test_row ['testId'],
    "upsert": {
       "DataIds": test_row ['DataIds']
    },
    "script": {
        "source": "ctx._source.DataIds.add(params.DataIds)",
        "lang": "painless",
        "params": {
           "DataIds":test_row ['DataIds']
        }
    }
} for test_row in temp_list
]

另一种方法是使用scripted_upsert

actions = [{
    "_op_type":"update",
    "_index": "test-data",
    "_type": "test-test-data",
    "_id": test_row ['testId'],
    "upsert": {},
    "scripted_upsert": true,
    "script": {
        "source": "if (ctx._source.DataIds == null) { ctx._source.DataIds = []; } ctx._source.DataIds.add(params.DataIds)",
        "lang": "painless",
        "params": {
           "DataIds":test_row ['DataIds']
        }
    }
} for test_row in temp_list
]

推荐阅读