python-3.x - 处理 ensure_future 及其缺失的任务
问题描述
我有一个流应用程序,它几乎连续地将给定的数据作为输入,并使用该值发送一个 HTTP 请求,并对返回的值做一些事情。
显然,为了加快速度,我在 Python 3.7 中使用了 asyncio 和 aiohttp 库来获得最佳性能,但考虑到数据移动的速度,调试变得很困难。
这就是我的代码的样子
'''
Gets the final requests
'''
async def apiRequest(info, url, session, reqType, post_data=''):
if reqType:
async with session.post(url, data = post_data) as response:
info['response'] = await response.text()
else:
async with session.get(url+post_data) as response:
info['response'] = await response.text()
logger.debug(info)
return info
'''
Loops through the batches and sends it for request
'''
async def main(data, listOfData):
tasks = []
async with ClientSession() as session:
for reqData in listOfData:
try:
task = asyncio.ensure_future(apiRequest(**reqData))
tasks.append(task)
except Exception as e:
print(e)
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
responses = await asyncio.gather(*tasks)
return responses #list of APIResponses
'''
Streams data in and prepares batches to send for requests
'''
async def Kconsumer(data, loop, batchsize=100):
consumer = AIOKafkaConsumer(**KafkaConfigs)
await consumer.start()
dataPoints = []
async for msg in consumer:
try:
sys.stdout.flush()
consumedMsg = loads(msg.value.decode('utf-8'))
if consumedMsg['tid']:
dataPoints.append(loads(msg.value.decode('utf-8')))
if len(dataPoints)==batchsize or time.time() - startTime>5:
'''
#1: The task below goes and sends HTTP GET requests in bulk using aiohttp
'''
task = asyncio.ensure_future(getRequests(data, dataPoints))
res = await asyncio.gather(*[task])
if task.done():
outputs = []
'''
#2: Does some ETL on the returned values
'''
ids = await asyncio.gather(*[doSomething(**{'tid':x['tid'],
'cid':x['cid'], 'tn':x['tn'],
'id':x['id'], 'ix':x['ix'],
'ac':x['ac'], 'output':to_dict(xmltodict.parse(x['response'],encoding='utf-8')),
'loop':loop, 'option':1}) for x in res[0]])
simplySaveDataIntoDataBase(id) # This is where I see some missing data in the database
dataPoints = []
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
logger.error(str(exc_type) +' '+ str(fname) +' '+ str(exc_tb.tb_lineno))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(Kconsumer(data, loop, batchsize=100))
loop.run_forever()
ensure_future 是否需要await
编辑?aiohttp 如何处理比其他请求晚一点的请求?它不应该阻止整个批次而不是完全忘记它吗?
解决方案
ensure_future
需要await
ed吗?
是的,您的代码已经在这样做了。await asyncio.gather(*tasks)
等待提供的任务并以相同的顺序返回它们的结果。
请注意,await asyncio.gather(*[task])
这没有意义,因为它等价于await asyncio.gather(task)
,这又等价于await task
. 换句话说,当你需要 的结果时getRequests(data, dataPoints)
,你可以不用先调用再调用res = await getRequests(data, dataPoints)
的仪式来写。ensure_future()
gather()
事实上,你几乎不需要给ensure_future
自己打电话:
- 如果您需要等待多个任务,您可以将协程对象直接传递给
gather
,例如gather(coroutine1(), coroutine2())
. - 如果你需要生成一个后台任务,你可以调用
asyncio.create_task(coroutine(...))
aiohttp 如何处理比其他请求晚一点的请求?它不应该阻止整个批次而不是完全忘记它吗?
如果您使用gather
,则所有请求都必须在其中任何一个返回之前完成。(这不是 aiohttp 策略,它是这样gather
工作的。)如果您需要实现超时,您可以使用asyncio.wait_for
或类似的。
推荐阅读
- sql - 有没有办法在添加间隔的同时保留一个月的最后一天?
- angular-tree-component - 角树组件虚拟滚动不起作用
- mysql - 如何从 MySQL JSON 数组中选择行?
- python - 将排序表从熊猫数据框发送到前端?
- html - Jekyll:插入前面的内容后找不到页面
- testrigor - 我怎样才能让 testRigor 以正确的方式找到这个输入?
- cakephp - CakePHP Bake :: 在 bake 命令上创建元素文件
- r - geom_density 返回图而不考虑实际值
- reactjs - 如何在 Typescript 中输入匹配项?
- spring-integration - 回复后可以收到原始消息吗