multithreading - 具有多线程的 ElasticSearch Scroll API
问题描述
首先,我想让你们知道,我知道 ElasticSearch Scroll API 是如何工作的基本工作逻辑。要使用Scroll API,首先,我们需要使用一些滚动值(如1m )调用search方法,然后它将返回一个_scroll_id ,该 _scroll_id将用于 Scroll 的下一次连续调用,直到所有 doc 在循环内返回。但问题是我只想在多线程的基础上使用相同的进程,而不是串行的。例如:
如果我有 300000 个文档,那么我想以这种方式处理/获取文档
- 第一个线程将处理最初的100000 个文档
- 第二个线程将处理接下来的100000 个文档
- 第三个线程将处理剩余的100000 个文档
所以我的问题是,因为我没有找到任何方法来设置滚动 API 上的from值,我如何才能通过线程加快滚动过程。不以序列化方式处理文件。
我的示例 python 代码
if index_name is not None and doc_type is not None and body is not None:
es = init_es()
page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while (scroll_size > 0):
print("Scrolling...")
page = es.scroll(scroll_id=sid, scroll='30s')
# Update the scroll ID
sid = page['_scroll_id']
print("scroll id: " + sid)
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print("scroll size: " + str(scroll_size))
print("scrolled data :" )
print(page['aggregations'])
解决方案
你试过切片卷轴吗?根据链接的文档:
对于返回大量文档的滚动查询,可以将滚动拆分为多个切片,这些切片可以独立使用。
和
每个滚动都是独立的,可以像任何滚动请求一样并行处理。
我自己没有使用过这个(我需要处理的最大结果集是大约 50k 个文档),但这似乎是您正在寻找的。
推荐阅读
- api - Pandora 非官方 REST API 是否损坏
- typescript - Axios:标题内容中的无效字符['适配器']
- vba - 字典与数据透视表和一些办公室问题
- javascript - 反应原生
显示错误的值 - reactjs - 如何在 React-Router 中的 Route 标签的路径数组中仅将确切关键字添加到一个路径
- office-addins - Excel 加载项在 PC 上加载最新版本,但在 Excel Online 中不加载
- objective-c - 使用 Kotlin 本机和 ObjectiveC/Swift 的 Optionals/Nullables
- swift - 委托方法,textViewDidChangeSelection 不是每次都触发
- javascript - 试图从javascript中的时间戳数组和价格数组中获取每小时分钟间隔的平均价格
- office-addins - 如何允许我们组织之外的一些用户使用我们的 Excel 加载项