首页 > 解决方案 > 为什么我在 Elasticsearch 和 Snowflake 之间差距很大?

问题描述

我的任务是在 python 中构建一个进程,该进程将从 Elasticsearch 中提取数据,将数据放入 Azure Blob,然后 Snowflake 将摄取数据。我在 Azure Functions 上运行了提取索引组(如 game_name.*)的进程,并且对于索引组中的每个索引,它都会创建一个要滚动的线程。我保存每个结果的最后日期,并在下次运行时在范围查询中解析它。我每五分钟运行一次该过程,并将范围的末端偏移了 5 分钟(我们每 2 分钟运行一次刷新)。我让进程运行一段时间,然后通过计数(*)进行差距分析在 Elasticsearch 和 Snowflake 中按小时(或按天)计算,预计最大差距为 1%。但是,对于一个包含大约 127 个索引的索引模式,当我运行一个追赶作业(一天或更长时间)时,所产生的差距正如预期的那样,但是,只要我让它在 cron 作业上运行(每 5 分钟),过了一会儿,我得到了 6-10% 的差距,而且只针对这个指数组。 在此处输入图像描述 看起来好像滚动功能在查询范围内选择了N数量的文档,但由于某种原因,稍后添加(PUT)文档的日期更早。或者我可能错了,我的代码正在做一些有趣的事情。我已经与我们的团队进行了交谈,他们不会在客户端缓存任何文档,并且数据会同步到网络时钟(不是客户端的)并发送 UTC。

请看下面我用来通过elasticsearch分页的查询:

def query(searchSize, lastRowDateOffset, endDate, pit, keep_alive):

    body = {
        "size": searchSize,
        "query": {
            "bool": {
                "must": [
                    {
                        "exists": {
                            "field": "baseCtx.date"
                        }
                    },
                    {
                        "range": {
                            "baseCtx.date": {
                                "gt": lastRowDateOffset,
                                "lte": endDate
                            }
                        }
                    }
                ]
            }
        },
        "pit": {
            "id": pit,
            "keep_alive": keep_alive
        },
        "sort": [
            {
                "baseCtx.date": {"order": "asc", "unmapped_type": "long"}
            },
            {
                "_shard_doc": "asc"
            }
        ],
        "track_total_hits": False
    }
    
    return body

def scroller(pit,
             threadNr,
             index,
             lastRowDateOffset,
             endDate,
             maxThreads,
             es,
             lastRowCount,
             keep_alive="1m",
             searchSize=10000):
    
    cumulativeResultCount = 0
    iterationResultCount = 0
    data = []
    dataBytes = b''
    lastIndexDate = ''
    startScroll = time.perf_counter()
    while 1:
        if lastRowCount == 0: break
        #if lastRowDateOffset == endDate: lastRowCount = 0; break
        try:
            page = es.search(body=body)
        except: # It is believed that the point in time is getting closed, hence the below opens a new one
            pit = es.open_point_in_time(index=index, keep_alive=keep_alive)['id']
            body = query(searchSize, lastRowDateOffset, endDate, pit, keep_alive)
            page = es.search(body=body)
        pit = page['pit_id']
        data += page['hits']['hits']
        body['pit']['id'] = pit
        if len(data) > 0: body['search_after'] = [x['sort'] for x in page['hits']['hits']][-1]
        cumulativeResultCount += len(page['hits']['hits'])
        iterationResultCount = len(page['hits']['hits'])

        #print(f"This Iteration Result Count: {iterationResultCount} -- Cumulative Results Count: {cumulativeResultCount} -- {time.perf_counter() - startScroll} seconds")

        if iterationResultCount < searchSize: break
        if len(data) > rowsPerMB * maxSizeMB / maxThreads: break
        if time.perf_counter() - startScroll > maxProcessTimeSeconds: break

    if len(data) != 0:
        dataBytes = gzip.compress(bytes(json.dumps(data)[1:-1], encoding='utf-8'))
        lastIndexDate = max([x['_source']['baseCtx']['date'] for x in data])

    response = {
        "pit": pit,
        "index": index,
        "threadNr": threadNr,
        "dataBytes": dataBytes,
        "lastIndexDate": lastIndexDate,
        "cumulativeResultCount": cumulativeResultCount
    }
    
    return response

def batch(game_name, env='prod', startDate='auto', endDate='auto', writeDate=True, minutesOffset=5):
    
    es = Elasticsearch(
        esUrl,
        port=9200,
        timeout=300)
    
    lowerFormat = game_name.lower().replace(" ","_")
    indexGroup = lowerFormat + "*"
    if env == 'dev': lowerFormat, indexGroup = 'dev_' + lowerFormat, 'dev.' + indexGroup
    azFormat = re.sub(r'[^0-9a-zA-Z]+', '-', game_name).lower()
    storageContainerName = azFormat
    curFileName = f"{lowerFormat}_cursors.json"
    curBlobFilePath = f"cursors/{curFileName}"    

    compressedTools = [gzip.compress(bytes('[', encoding='utf-8')), gzip.compress(bytes(',', encoding='utf-8')), gzip.compress(bytes(']', encoding='utf-8'))]
    pits = []
    lastRowCounts = []

    # Parameter and state settings
    if os.getenv(f"{lowerFormat}_maxSizeMB") is not None: maxSizeMB = int(os.getenv(f"{lowerFormat}_maxSizeMB"))
    if os.getenv(f"{lowerFormat}_maxThreads") is not None: maxThreads = int(os.getenv(f"{lowerFormat}_maxThreads"))
    if os.getenv(f"{lowerFormat}_maxProcessTimeSeconds") is not None: maxProcessTimeSeconds = int(os.getenv(f"{lowerFormat}_maxProcessTimeSeconds"))

    # Get all indices for the indexGroup
    indicesEs = list(set([(re.findall(r"^.*-", x)[0][:-1] if '-' in x else x) + '*' for x in list(es.indices.get(indexGroup).keys())]))
    indices = [{"indexName": x, "lastOffsetDate": (datetime.datetime.utcnow()-datetime.timedelta(days=5)).strftime("%Y/%m/%d 00:00:00")} for x in indicesEs]

    # Load Cursors
    cursors = getCursors(curBlobFilePath, indices)


    # Offset the current time by -5 minutes to account for the 2-3 min delay in Elasticsearch
    initTime = datetime.datetime.utcnow()
    if endDate == 'auto': endDate = f"{initTime-datetime.timedelta(minutes=minutesOffset):%Y/%m/%d %H:%M:%S}"
    print(f"Less than or Equal to: {endDate}, {keep_alive}")

    # Start Multi-Threading
    while 1:
        dataBytes = []
        dataSize = 0
        start = time.perf_counter()

        if len(pits) == 0: pits = ['' for x in range(len(cursors))]
        if len(lastRowCounts) == 0: lastRowCounts = ['' for x in range(len(cursors))]

        with concurrent.futures.ThreadPoolExecutor(max_workers=len(cursors)) as executor:
            results = [
                executor.submit(
                    scroller,
                    pit,
                    threadNr,
                    x['indexName'],
                    x['lastOffsetDate'] if startDate == 'auto' else startDate,
                    endDate,
                    len(cursors),
                    es,
                    lastRowCount,
                    keep_alive,
                    searchSize) for x, pit, threadNr, lastRowCount in (zip(cursors, pits, list(range(len(cursors))), lastRowCounts))
            ]

            for f in concurrent.futures.as_completed(results):
                if f.result()['lastIndexDate'] != '': cursors[f.result()['threadNr']]['lastOffsetDate'] = f.result()['lastIndexDate']
                pits[f.result()['threadNr']] = f.result()['pit']
                lastRowCounts[f.result()['threadNr']] = f.result()['cumulativeResultCount']

                dataSize += f.result()['cumulativeResultCount']
                if len(f.result()['dataBytes']) > 0: dataBytes.append(f.result()['dataBytes'])

                print(f"Thread {f.result()['threadNr']+1}/{len(cursors)} -- Index {f.result()['index']} -- Results pulled {f.result()['cumulativeResultCount']} -- Cumulative Results: {dataSize} -- Process Time: {round(time.perf_counter()-start, 2)} sec")

        if dataSize == 0: break
        lastRowDateOffsetDT = datetime.datetime.strptime(max([x['lastOffsetDate'] for x in cursors]), '%Y/%m/%d %H:%M:%S')
        outFile = f"elasticsearch/live/{lastRowDateOffsetDT:%Y/%m/%d/%H}/{lowerFormat}_live_{lastRowDateOffsetDT:%Y%m%d%H%M%S}_{datetime.datetime.utcnow():%Y%m%d%H%M%S}.json.gz"

        print(f"Starting compression of {dataSize} rows -- {round(time.perf_counter()-start, 2)} sec")
        dataBytes = compressedTools[0] + compressedTools[1].join(dataBytes) + compressedTools[2]

        # Upload to Blob
        print(f"Comencing to upload data to blob -- {round(time.perf_counter()-start, 2)} sec")
        uploadJsonGzipBlobBytes(outFile, dataBytes, storageContainerName, len(dataBytes))
        print(f"File compiled: {outFile} -- {dataSize} rows -- Process Time: {round(time.perf_counter()-start, 2)} sec\n")

        # Update cursors
        if writeDate: postCursors(curBlobFilePath, cursors)

    # Clean Up
    print("Closing PITs")
    for pit in pits:
        try: es.close_point_in_time({"id": pit})
        except: pass
    print(f"Closing Connection to {esUrl}")
    es.close()
    return

# Start the process
while 1:
    batch("My App")

我想我只需要第二双眼睛来指出问题可能出在代码中的位置。我尝试将minutesOffset argv 增加到 60(因此每 5 分钟它会从上次运行中提取数据,直到 Now()-60 分钟),但遇到了同样的问题。请帮忙。

标签: pythonelasticsearch

解决方案


因此,“baseCtx.date”是由客户端触发的,在某些情况下,似乎在触发事件和可搜索事件之间存在延迟。我们通过使用摄取管道解决了这个问题,如下所示:

PUT _ingest/pipeline/indexDate
{
  "description": "Creates a timestamp when a document is initially indexed",
  "version": 1,
  "processors": [
    {
      "set": {
        "field": "indexDate",
        "value": "{{{_ingest.timestamp}}}",
        "tag": "indexDate"
      }
    }
  ]
}

并在模板设置中将 index.default_pipeline 设置为“indexDate”。每个月索引名称都会更改(我们附加年份和月份),这种方法会创建一个我们用来滚动的服务器日期。


推荐阅读