首页 > 解决方案 > 具有多线程的 ElasticSearch Scroll API

问题描述

首先,我想让你们知道,我知道 ElasticSearch Scroll API 是如何工作的基本工作逻辑。要使用Scroll API,首先,我们需要使用一些滚动值(如1m )调用search方法,然后它将返回一个_scroll_id ,该 _scroll_id将用于 Scroll 的下一次连续调用,直到所有 doc 在循环内返回。但问题是我只想在多线程的基础上使用相同的进程,而不是串行的。例如:

如果我有 300000 个文档,那么我想以这种方式处理/获取文档

所以我的问题是,因为我没有找到任何方法来设置滚动 API 上的from值,我如何才能通过线程加快滚动过程。不以序列化方式处理文件。

我的示例 python 代码

if index_name is not None and doc_type is not None and body is not None:
   es = init_es()
   page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
   sid = page['_scroll_id']
   scroll_size = page['hits']['total']

   # Start scrolling
   while (scroll_size > 0):

       print("Scrolling...")
       page = es.scroll(scroll_id=sid, scroll='30s')
       # Update the scroll ID
       sid = page['_scroll_id']

       print("scroll id: " + sid)

       # Get the number of results that we returned in the last scroll
       scroll_size = len(page['hits']['hits'])
       print("scroll size: " + str(scroll_size))

       print("scrolled data :" )
       print(page['aggregations'])

标签: multithreadingelasticsearchelasticsearch-py

解决方案


你试过切片卷轴吗?根据链接的文档:

对于返回大量文档的滚动查询,可以将滚动拆分为多个切片,这些切片可以独立使用。

每个滚动都是独立的,可以像任何滚动请求一样并行处理。

我自己没有使用过这个(我需要处理的最大结果集是大约 50k 个文档),但这似乎是您正在寻找的。


推荐阅读