首页 > 解决方案 > Impala 查询不以异步方式执行

问题描述

基本上,有一个小的 aiohttp 应用程序,它接收 Impala 查询列表,然后向它们发送 Impala。然而,有些查询可能需要很长时间才能完成,因此决定以异步/并行方式进行。得到了一个线程工作的解决方案,但很想看看是否有可能只使用 asyncio/tornado 来达到相同的速度。

我的代码如下:

async def run(self, queries):
    # Here I validate queries
    query_list    = await self.build_query_list(split_queries)        # Format: [[queries for connection_1], [queries for connection_2], ...]

    start         = time.time()
    # Assing group of queries to each connection and wait results
    result_queue = deque()
    await multi([self.execute_impala_query(connection.connection, query_list[index], result_queue) for index, connection in enumerate(connection_list)])

    # Close all connections
    [await self.impala_connect_advance_pool.release_connection(connection) for connection in connection_list]

    # Wait for Impala responses
    while len(result_queue) < connect_limit: 
        continue

    # Send results back


async def execute_impala_query(self, impala_connect, queries, queue):
    return await multi([self.impala_response_to_json_response(impala_connect.cursor(), query, queue) for query in queries])

async def impala_response_to_json_response(self, impala_cursor, query, queue):
    self.logger.info('execute query: {}'.format(query))
    print ('execute query: {}'.format(query))

    def get_results():
        impala_cursor.execute(query)
        results = as_pandas(impala_cursor)
        impala_cursor.close()
        self.logger.info('{} completed'.format(query))
        print ('{} completed'.format(query))
        queue.append(results.to_json(orient='records'))

    IOLoop.current().spawn_callback(get_results)

发生的情况是,一旦它运行,我可以在标准输出中看到“执行查询:查询”消息,我假设它们都被触发并正在执行,但是,它需要 2(或更多)时间作为带有线程的版本。我是把整个异步概念弄错了,还是在方法的某个地方犯了一些愚蠢的错误?

标签: python-3.6tornadopython-asyncioimpyla

解决方案


整个异步概念错误是的,只是通过调用一个函数spawn_callback不会使其异步:您的数据库连接器应该支持异步 IO。正如我所看到的:我建议您看一下execute_async方法。然后您需要编写自己的等待函数,例如Impyla 的 _wait_to_finish,但使用tornado.gen.sleep而不是time.sleep().


推荐阅读