python - 使用 elasticsearch_dsl 扫描和切片需要很长时间
问题描述
我有一个查询,它从 ElasticSearch 返回总共约 50-6000 万次点击。我尝试使用多处理将工作负载分成多个片,虽然它在一开始确实加快了整个过程,但扫描最终会减慢速度,然后在收集了大约 15-20 百万次点击后停止。下面的代码适用于大约 100 万次点击的较小结果集,但对于较大的结果集,我无法在合理的时间内完成。
from elasticsearch import Elasticsearch as ES
from elasticsearch_dsl import Search
import multiprocessing as mp
from multiprocessing import Pool
from functools import partial
SLICES = 5
es = ES(['https://example.com/'],http_auth =
(user,pwd),timeout = 120)
def dump_slice(query, index, slice_no):
s = Search(using=es,index=index).update_from_dict(query)
s = s.extra(slice={"id": slice_no, "max": SLICES})
s = s.params(scroll='30m',preserve_order=True,size=5000)
hits = []
for d in s.scan():
my_id = {'id':d.meta.id}
my_index = {'index':d.meta.index}
hits.append({**my_id,**my_index,**d.to_dict()})
if (len(hits) % 250000 == 0):
logging.info(str(len(hits)) + ' records processed by PID ' +
str(mp.current_process().pid))
return hits
q = partial(dump_slice, query, index)
with Pool(SLICES) as pool:
hits = pool.map(q, range(SLICES))
解决方案
推荐阅读
- laravel - Vue,$ 是什么意思?
- c# - Is there a way I can return more than one integer from a method?
- javascript - 编译 es2015 库时如何将 CSS 注入 javascript 文件
- angularjs - Is Excel clearing our http request authorization header attribute?
- javascript - 如何从一个 cshtml 文件中调用或使用 cshtml 文件的功能?
- azure - 在公共 IP 访问上自动启动 Azure 虚拟机很热门?
- google-apps-script - Automatically email deleted rows using googlescript
- rxjs - ionic 4中的订阅类型变量声明问题
- upload - UploadCollection: Non-working drag & drop functionality by using setUploadEnabled
- c++ - 如何使具有 QPoint 指针成员的类入队和出队?