python-3.6 - Impala 查询不以异步方式执行
问题描述
基本上,有一个小的 aiohttp 应用程序,它接收 Impala 查询列表,然后向它们发送 Impala。然而,有些查询可能需要很长时间才能完成,因此决定以异步/并行方式进行。得到了一个线程工作的解决方案,但很想看看是否有可能只使用 asyncio/tornado 来达到相同的速度。
我的代码如下:
async def run(self, queries):
# Here I validate queries
query_list = await self.build_query_list(split_queries) # Format: [[queries for connection_1], [queries for connection_2], ...]
start = time.time()
# Assing group of queries to each connection and wait results
result_queue = deque()
await multi([self.execute_impala_query(connection.connection, query_list[index], result_queue) for index, connection in enumerate(connection_list)])
# Close all connections
[await self.impala_connect_advance_pool.release_connection(connection) for connection in connection_list]
# Wait for Impala responses
while len(result_queue) < connect_limit:
continue
# Send results back
async def execute_impala_query(self, impala_connect, queries, queue):
return await multi([self.impala_response_to_json_response(impala_connect.cursor(), query, queue) for query in queries])
async def impala_response_to_json_response(self, impala_cursor, query, queue):
self.logger.info('execute query: {}'.format(query))
print ('execute query: {}'.format(query))
def get_results():
impala_cursor.execute(query)
results = as_pandas(impala_cursor)
impala_cursor.close()
self.logger.info('{} completed'.format(query))
print ('{} completed'.format(query))
queue.append(results.to_json(orient='records'))
IOLoop.current().spawn_callback(get_results)
发生的情况是,一旦它运行,我可以在标准输出中看到“执行查询:查询”消息,我假设它们都被触发并正在执行,但是,它需要 2(或更多)时间作为带有线程的版本。我是把整个异步概念弄错了,还是在方法的某个地方犯了一些愚蠢的错误?
解决方案
整个异步概念错误是的,只是通过调用一个函数spawn_callback
不会使其异步:您的数据库连接器应该支持异步 IO。正如我所看到的:我建议您看一下execute_async方法。然后您需要编写自己的等待函数,例如Impyla 的 _wait_to_finish,但使用tornado.gen.sleep而不是time.sleep()
.
推荐阅读
- css - 努力在 CSS 中正确溢出
- swift - 如何从 Firestore 引用中获取数据?
- asp.net-mvc - Kendo MVC 服务器端分页
- node.js - 无法计算 webdriverio devtools 中的性能指标
- python - 如何在字符串数据框中实现千位分隔符而不用 Python Pandas 中的百分比值在一行中进行修改?
- java - JavaFX中的单向绑定和转换设置
- python - 我如何*动态*在 SQLAlchemy 中为 MSSQL 设置模式?
- docker - Moonbeam(波卡平行链)交易错误
- docker - 我在 csapp 中做实验室时使用 docker 时遇到的一个问题
- pandas - 用字典覆盖数据后保存csv