python - 将 JSON 文档加载到 ElasticSearch 时,TransportError (429) '数据太大'
问题描述
我有一个在 Python 3.7 中运行的进程,它加载 JSON 文件,将文件行收集到异步队列中的块中,并将块增量发布到 ElasticSearch 以进行索引。
分块是为了避免 ElasticSearch 连接过载。
def load_files_to_queue(file_queue, incomplete_files, doc_queue, index):
logger.info("Initializing load files to queue")
while True:
try:
current_file = file_queue.get(False)
logger.info("Loading {} into queue.".format(current_file))
iteration_counter = 0
with open(current_file) as loaded_file:
iterator = json_iterator(loaded_file)
current_type = "doctype"
chunk = []
for row in iterator:
# Every so often check the queue size
iteration_counter += 1
if iteration_counter > 5000:
# If it gets too big, pause until it has gone
# down a bunch.
if doc_queue.qsize() > 30:
logger.info(
"Doc queue at {}, pausing until smaller.".format(
doc_queue.qsize()
)
)
while doc_queue.qsize() > 10:
time.sleep(0.5)
iteration_counter = 0
for transformed in transform_single_doc(current_type, row, index):
if transformed:
chunk.append(transformed)
# NOTE: Send messages in chunks in stead of single rows so that queue
# has less frequent locking
if len(chunk) >= DOC_QUEUE_CHUNK_SIZE:
doc_queue.put(chunk)
chunk = []
if chunk:
doc_queue.put(chunk)
incomplete_files.remove(current_file)
logger.info("Finished loading {} into queue.".format(current_file))
logger.info("There are {} files left to load.".format(file_queue.qsize()))
except Empty:
break
def bulk_load_from_queue(file_queue, incomplete_files, doc_queue, chunk_size=500):
"""
Represents a single worker thread loading docs into ES
"""
logger.info("Initialize bulk doc loader {}".format(threading.current_thread()))
conn = Elasticsearch(settings.ELASTICSEARCH, timeout=180)
dequeue_results(
streaming_bulk(
conn,
load_docs_from_queue(file_queue, incomplete_files, doc_queue),
max_retries=2,
initial_backoff=10,
chunk_size=chunk_size,
yield_ok=False,
raise_on_exception=True,
raise_on_error=True,
)
)
logger.info("Shutting down doc loader {}".format(threading.current_thread()))
偶尔会发生这样的错误bulk_load_from_queue
,我将其解释为块太大。
TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_request>] would be [1024404322/976.9mb], which is larger than the limit of [1011774259/964.9mb], real usage: [1013836880/966.8mb], new bytes reserved: [10567442/10mb], usages [request=32880/32.1kb, fielddata=7440/7.2kb, in_flight_requests=164031664/156.4mb, accounting=46679308/44.5mb]')
重新运行通常可以解决此问题,但错误变得过于频繁。所以我希望像这样强制执行块大小限制load_files_to_queue
:
for transformed in transform_single_doc(current_type, row, index):
if transformed:
chunk_size = chunk_size + sys.getsizeof(transformed)
chunk.append(transformed)
# NOTE: Send messages in chunks in stead of single rows so that queue
# has less frequent locking
if (
chunk_size >= DOC_QUEUE_CHUNK_SIZE
or len(chunk) >= DOC_QUEUE_CHUNK_LEN
):
doc_queue.put(chunk)
chunk = []
chunk_size = 0
if len(chunk) > 0:
doc_queue.put(chunk)
这会在处理结束时导致一些错误:
ConnectionResetError
[Errno 104] Connection reset by peer
接着:
EOFError multiprocessing.connection in _recv
解决方案
基本上这意味着您对 Elasticsearch 的请求太大而无法处理,因此您可以尝试减小块大小
或者,看看使用 _bulk api,python 客户端中有帮助程序,这应该可以消除大部分痛苦
推荐阅读
- reactjs - 404同时用laravel刷新反应路线?
- r - 在 R 中使用 fread 导入不完整的大型 CSV
- django - 通过模型将数据添加到 M2M
- excel - 如何使用 Excel VBA 中的宏修复 1004 错误?
- xcode - 有条件地在 Xcode 项目中包含文件
- python - 熊猫:处理数据框
- spring - 如何避免 Spring Boot 应用程序中的冗余流量?
- angular - 如何使 Angular Material 对话框保持其绝对位置
- scala - 斯卡拉火花。通过 DataType 创建具有默认值的对象
- python - 如何在 TensorFlow 中获取不规则形状的参数