python - 如何编写可与线程或协程一起使用并在确定的时间内完成的 Python 代码?
问题描述
我所说的“确定性时间”是什么意思?例如 AWS 提供服务“AWS Lambda”。作为 lambda 函数启动的进程有时间限制,之后 lambda 函数将停止执行并假定任务已完成但有错误。和示例任务 - 将数据发送到 http 端点。根据与 http 端点的网络连接或其他因素,发送数据的过程可能需要很长时间。如果我需要将相同的数据发送到许多端点,那么整个处理时间将需要one process time
times endpoints amount
。这增加了在所有数据发送到所有端点之前停止 lambda 函数的机会。为了解决这个问题,我需要使用线程以并行模式将数据发送到不同的端点。
线程的问题 - 启动的线程无法停止。如果 http 请求花费的时间超过了 lambda 函数时间限制所指定的时间,则 lambda 函数将被中止并返回错误。所以我需要对http请求使用超时来中止它,如果它花费的时间比预期的要多。
如果http请求将被超时取消或端点将返回错误,我需要将未处理的数据保存在某处以免丢失数据。可以预测保存未处理数据所需的时间,因为我控制了保存数据的存储空间。
最后一部分消耗时间过程或线程被调度的循环executor.submit()
。如果只有一个端点或少数端点,则消耗的时间会很小。没有必要控制这一点。但是如果我处理了许多端点,我必须考虑到这一点。
所以基本上全职将包括:
- 调度线程
- http请求执行
- 保存未处理的数据
有一个例子说明我如何使用线程来管理时间
import concurrent.futures
from functools import partial
import requests
import time
start = time.time()
def send_data(data):
host = 'http://127.0.0.1:5000/endpoint'
try:
result = requests.post(host, json=data, timeout=(0.1, 0.5))
# print('done')
if result.status_code == 200:
return {'status': 'ok'}
if result.status_code != 200:
return {'status': 'error', 'msg': result.text}
except requests.exceptions.Timeout as err:
return {'status': 'error', 'msg': 'timeout'}
def get_data(n):
return {"wait": n}
def done_cb(a, b, future):
pass # save unprocessed data
def main():
executor = concurrent.futures.ThreadPoolExecutor()
futures = []
max_time = 0.5
for i in range(1):
future = executor.submit(send_data, *[{"wait": 10}])
future.add_done_callback(partial(done_cb, 2, 3))
futures.append(future)
if time.time() - s_time > max_time:
print('stopping creating new threads')
# save unprocessed data
break
try:
for item in concurrent.futures.as_completed(futures, timeout=1):
item.result()
except concurrent.futures.TimeoutError as err:
pass
我在考虑如何使用asyncio
库而不是线程来做同样的事情。
import asyncio
import time
from functools import partial
import requests
start = time.time()
def send_data(data):
...
def get_data(n):
return {"wait": n}
def done_callback(a,b, future):
pass # save unprocessed data
def main(loop):
max_time = 0.5
futures = []
start_appending = time.time()
for i in range(1):
event_data = get_data(1)
future = (loop.run_in_executor(None, send_data, event_data))
future.add_done_callback(partial(done_callback, 2, 3))
futures.append(future)
if time.time() - s_time > max_time:
print('stopping creating new futures')
# save unprocessed data
break
finished, unfinished = loop.run_until_complete(
asyncio.wait(futures, timeout=1)
)
_loop = asyncio.get_event_loop()
result = main(_loop)
功能send_data()
与之前的代码相同。因为请求库不是我run_in_executor()
用来创建未来对象的异步代码。我遇到的主要问题是done_callback()
当线程启动但执行器完成它的工作时没有执行。但只有当期货将被asyncio.wait()
表达“处理”时。
基本上我正在寻找开始执行异步未来的方法,比如ThreadPoolExecutor
开始执行线程,而不是等待asyncio.wait()
表达式调用done_callback()
。如果您有其他想法,如何编写可与线程或协程一起工作并在确定性时间内完成的 Python 代码。请分享它,我会很高兴阅读它们。
和其他问题。如果线程或未来完成了它的工作,它可以返回结果,我可以使用 in done_callback()
,例如通过结果中返回的 id 从队列中删除消息。但是如果线程或未来被取消,我没有结果。而且我必须使用functools.partial()
pass in done_callback 附加数据,这可以帮助我理解这个回调被调用的数据。如果传递的数据很小,这不是问题。如果数据很大,我需要将数据放入数组/列表/字典中并传入回调仅数组的索引或将“完整数据:在回调中。
done_callback()
我能否以某种方式访问在取消的未来/线程上触发的传递给未来/线程的变量?
解决方案
您可以使用asyncio.wait_for
来等待一个未来(或多个未来,当与 结合使用时asyncio.gather
)并在超时的情况下取消它们。与线程不同,asyncio 支持取消,因此您可以随时取消任务,并且它将在它发出的第一个阻塞调用(通常是网络调用)时被取消。
请注意,要使其正常工作,您应该使用异步原生库,例如aiohttp
HTTP。尝试requests
与 asyncio using结合使用run_in_executor
似乎适用于简单的任务,但它不会为您带来使用 asyncio 的好处,例如能够在不妨碍操作系统的情况下生成大量任务,或者取消的可能性。
推荐阅读
- javascript - 如何使用 webpack 模板在 vue.js 上发出 HTTP POST 请求
- ios - Apple Mach-O 链接器错误 - 架构 x86_64 的未定义符号
- javascript - 防止单个表单项上的全局 jQuery 事件
- javascript - 限制用户输入新名称,只允许从现有列表中选择
- vb.net - 为什么我必须取消 OpenFileDialog 两次才能关闭
- python-3.x - vsftpd 是否支持 mlsd 命令?
- javascript - 来自 javascript 的属性更新背后的代码
- three.js - 在 a 帧中的标记上缩放 3d 模型
- spring - Spring Security 5 OAuth 2.0 ResourceServer 如何与 AuthorizationServer 通信?
- android - Android - javax.imageio 不存在