python - 使用 Python 进行 Elasticsearch 批量插入 - 套接字超时错误
问题描述
弹性搜索 7.10.2
蟒蛇 3.8.5
弹性搜索-py 7.12.1
我正在尝试使用 elasticsearch-py 批量助手向 ElasticSearch 批量插入 100,000 条记录。
这是Python代码:
import sys
import datetime
import json
import os
import logging
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
# ES Configuration start
es_hosts = [
"http://localhost:9200",]
es_api_user = 'user'
es_api_password = 'pw'
index_name = 'index1'
chunk_size = 10000
errors_before_interrupt = 5
refresh_index_after_insert = False
max_insert_retries = 3
yield_ok = False # if set to False will skip successful documents in the output
# ES Configuration end
# =======================
filename = file.json
logging.info('Importing data from {}'.format(filename))
es = Elasticsearch(
es_hosts,
#http_auth=(es_api_user, es_api_password),
sniff_on_start=True, # sniff before doing anything
sniff_on_connection_fail=True, # refresh nodes after a node fails to respond
sniffer_timeout=60, # and also every 60 seconds
retry_on_timeout=True, # should timeout trigger a retry on different node?
)
def data_generator():
f = open(filename)
for line in f:
yield {**json.loads(line), **{
"_index": index_name,
}}
errors_count = 0
for ok, result in streaming_bulk(es, data_generator(), chunk_size=chunk_size, refresh=refresh_index_after_insert,
max_retries=max_insert_retries, yield_ok=yield_ok):
if ok is not True:
logging.error('Failed to import data')
logging.error(str(result))
errors_count += 1
if errors_count == errors_before_interrupt:
logging.fatal('Too many import errors, exiting with error code')
exit(1)
print("Documents loaded to Elasticsearch")
当 json 文件包含少量文档 (~100) 时,此代码运行没有问题。但我只是用一个 100k 文档的文件对其进行了测试,我得到了这个错误:
WARNING:elasticsearch:POST http://127.0.0.1:9200/_bulk?refresh=false [status:N/A request:10.010s]
Traceback (most recent call last):
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 426, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 421, in _make_request
httplib_response = conn.getresponse()
File "/Users/me/opt/anaconda3/lib/python3.8/http/client.py", line 1347, in getresponse
response.begin()
File "/Users/me/opt/anaconda3/lib/python3.8/http/client.py", line 307, in begin
version, status, reason = self._read_status()
File "/Users/me/opt/anaconda3/lib/python3.8/http/client.py", line 268, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/Users/me/opt/anaconda3/lib/python3.8/socket.py", line 669, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py", line 251, in perform_request
response = self.pool.urlopen(
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 726, in urlopen
retries = retries.increment(
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/util/retry.py", line 386, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/packages/six.py", line 735, in reraise
raise value
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 670, in urlopen
httplib_response = self._make_request(
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 428, in _make_request
self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 335, in _raise_timeout
raise ReadTimeoutError(
urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='127.0.0.1', port=9200): Read timed out. (read timeout=10)
我不得不承认这个有点过头了。我通常不喜欢在此处粘贴大的错误消息,但我不确定此消息的相关性。
我不禁想到我可能需要调整es
对象中的一些参数?还是配置变量?我对参数的了解不够,无法自己做出有根据的决定。
最后但并非最不重要的一点 - 看起来一些文档仍然被加载到 ES 索引中。但更奇怪的是,当 json 文件只有 100k 时,计数显示为 110k。
解决方案
TL;博士:
将 10000减少chunk_size
到默认值 500,我希望它可以工作。如果可以给您重复,您可能希望禁用自动重试。
发生了什么?
创建Elasticsearch
对象时,您指定了chunk_size=10000
. 这意味着streaming_bulk
调用将尝试插入 10000 个元素的块。与 elasticsearch 的连接具有可配置的超时时间,默认为 10 秒。因此,如果您的 elasticsearch 服务器处理您要插入的 10000 个元素的时间超过 10 秒,则会发生超时,这将作为错误处理。
创建Elasticsearch
对象时,您还指定retry_on_timeout
为 True 并在streaming_bulk_call
您设置max_retries=max_insert_retries
的 3 中。
这意味着当发生这种超时时,库将尝试重新连接 3 次,但是,当插入仍然有超时之后,它会给你你注意到的错误。(文档)
此外,当超时发生时,库无法知道文档是否插入成功,因此必须假设它们没有插入。因此,它将尝试再次插入相同的文档。我不知道您的输入行是什么样子,但如果它们不包含_id
field,这会在您的索引中创建重复项。您可能希望通过添加某种_id
,或禁用自动重试并手动处理来防止这种情况发生。
该怎么办?
有两种方法可以解决这个问题:
- 增加
timeout
- 减少
chunk_size
streaming_bulk
默认情况下已chunk_size
设置为 500。您的 10000 要高得多。当将此值增加到 500 以上时,我不希望您能获得高性能,所以我建议您在这里只使用默认值 500。如果 500 仍然因超时而失败,您甚至可能希望进一步减少它。如果您要索引的文档非常复杂,则可能会发生这种情况。
您还可以增加streaming_bulk
调用的超时时间,或者为您的es
对象增加超时时间。要仅为streaming_bulk
调用更改它,您可以提供request_timeout
关键字参数:
for ok, result in streaming_bulk(
es,
data_generator(),
chunk_size=chunk_size,
refresh=refresh_index_after_insert,
request_timeout=60*3, # 3 minutes
yield_ok=yield_ok):
# handle like you did
pass
但是,这也意味着只有在这个更高的超时时间之后才会检测到 elasticsearch 节点故障。有关更多详细信息,请参阅文档
推荐阅读
- python - 如何向状态/用户时间线发出 Twitter API 请求
- winapi - 是否有任何 Win32 API 可以在运行时隐藏和显示菜单项?
- windows - 属性 OSK.EXE(屏幕键盘)+POWERSHELL +REGEDIT 的问题
- vbscript - 带有 SeleniumBasic 的 VBScript 自动关闭浏览器
- typescript - 如何使用 cypress 进行 if..then 测试流程
- javascript - 如何在 React 中创建可搜索的组件
- pine-script - 比较 Pinescript 中的负数
- android-studio - 底部导航视图,活动在 Kotlin 中不起作用
- r - 如何在R中生成连接的表格状图
- matlab - 在matlab中将扇区图像转换为矩形图像