首页 > 解决方案 > 将 JSON 文档加载到 ElasticSearch 时,TransportError (429) '数据太大'

问题描述

我有一个在 Python 3.7 中运行的进程,它加载 JSON 文件,将文件行收集到异步队列中的块中,并将块增量发布到 ElasticSearch 以进行索引。

分块是为了避免 ElasticSearch 连接过载。

def load_files_to_queue(file_queue, incomplete_files, doc_queue, index):
    logger.info("Initializing load files to queue")
    while True:
        try:
            current_file = file_queue.get(False)
            logger.info("Loading {} into queue.".format(current_file))
            iteration_counter = 0
            with open(current_file) as loaded_file:
                iterator = json_iterator(loaded_file)
                current_type = "doctype"
                chunk = []
                for row in iterator:
                    # Every so often check the queue size
                    iteration_counter += 1
                    if iteration_counter > 5000:
                        # If it gets too big, pause until it has gone
                        # down a bunch.
                        if doc_queue.qsize() > 30:
                            logger.info(
                                "Doc queue at {}, pausing until smaller.".format(
                                    doc_queue.qsize()
                                )
                            )
                            while doc_queue.qsize() > 10:
                                time.sleep(0.5)
                        iteration_counter = 0

                    for transformed in transform_single_doc(current_type, row, index):
                        if transformed:
                            chunk.append(transformed)
                    # NOTE: Send messages in chunks in stead of single rows so that queue
                    # has less frequent locking
                    if len(chunk) >= DOC_QUEUE_CHUNK_SIZE:
                        doc_queue.put(chunk)
                        chunk = []
                if chunk:
                    doc_queue.put(chunk)
            incomplete_files.remove(current_file)
            logger.info("Finished loading {} into queue.".format(current_file))
            logger.info("There are {} files left to load.".format(file_queue.qsize()))
        except Empty:
            break

def bulk_load_from_queue(file_queue, incomplete_files, doc_queue, chunk_size=500):
    """
    Represents a single worker thread loading docs into ES
    """
    logger.info("Initialize bulk doc loader {}".format(threading.current_thread()))
    conn = Elasticsearch(settings.ELASTICSEARCH, timeout=180)
    dequeue_results(
        streaming_bulk(
            conn,
            load_docs_from_queue(file_queue, incomplete_files, doc_queue),
            max_retries=2,
            initial_backoff=10,
            chunk_size=chunk_size,
            yield_ok=False,
            raise_on_exception=True,
            raise_on_error=True,
        )
    )
    logger.info("Shutting down doc loader {}".format(threading.current_thread()))

偶尔会发生这样的错误bulk_load_from_queue,我将其解释为块太大。

TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_request>] would be [1024404322/976.9mb], which is larger than the limit of [1011774259/964.9mb], real usage: [1013836880/966.8mb], new bytes reserved: [10567442/10mb], usages [request=32880/32.1kb, fielddata=7440/7.2kb, in_flight_requests=164031664/156.4mb, accounting=46679308/44.5mb]')

重新运行通常可以解决此问题,但错误变得过于频繁。所以我希望像这样强制执行块大小限制load_files_to_queue

                    for transformed in transform_single_doc(current_type, row, index):
                        if transformed:
                            chunk_size = chunk_size + sys.getsizeof(transformed)
                            chunk.append(transformed)
                    # NOTE: Send messages in chunks in stead of single rows so that queue
                    # has less frequent locking
                    if (
                        chunk_size >= DOC_QUEUE_CHUNK_SIZE
                        or len(chunk) >= DOC_QUEUE_CHUNK_LEN
                    ):
                        doc_queue.put(chunk)
                        chunk = []
                        chunk_size = 0
                if len(chunk) > 0:
                    doc_queue.put(chunk)

这会在处理结束时导致一些错误:

ConnectionResetError
[Errno 104] Connection reset by peer

接着:

EOFError multiprocessing.connection in _recv

标签: pythonelasticsearchasynchronous

解决方案


基本上这意味着您对 Elasticsearch 的请求太大而无法处理,因此您可以尝试减小块大小

或者,看看使用 _bulk api,python 客户端中有帮助程序,这应该可以消除大部分痛苦


推荐阅读