首页 > 解决方案 > 使用 elasticsearch_dsl 扫描和切片需要很长时间

问题描述

我有一个查询,它从 ElasticSearch 返回总共约 50-6000 万次点击。我尝试使用多处理将工作负载分成多个片,虽然它在一开始确实加快了整个过程,但扫描最终会减慢速度,然后在收集了大约 15-20 百万次点击后停止。下面的代码适用于大约 100 万次点击的较小结果集,但对于较大的结果集,我无法在合理的时间内完成。

    from elasticsearch import Elasticsearch as ES
    from elasticsearch_dsl import Search
    import multiprocessing as mp
    from multiprocessing import Pool
    from functools import partial

    SLICES = 5

    es = ES(['https://example.com/'],http_auth = 
                   (user,pwd),timeout = 120)



    def dump_slice(query, index, slice_no):    

        s = Search(using=es,index=index).update_from_dict(query)                                        
        s = s.extra(slice={"id": slice_no, "max": SLICES})     
        s = s.params(scroll='30m',preserve_order=True,size=5000)  
        hits = []                  
        for d in s.scan():                                                          

            my_id = {'id':d.meta.id}
            my_index = {'index':d.meta.index}

            hits.append({**my_id,**my_index,**d.to_dict()})


            if (len(hits) % 250000 == 0):
                logging.info(str(len(hits)) + ' records processed by PID ' + 
                              str(mp.current_process().pid))

        return hits

    q = partial(dump_slice, query, index)     

    with Pool(SLICES) as pool:
        hits = pool.map(q, range(SLICES))

标签: pythonelasticsearchelasticsearch-dsl

解决方案


推荐阅读