首页 > 解决方案 > 如何编写可与线程或协程一起使用并在确定的时间内完成的 Python 代码?

问题描述

我所说的“确定性时间”是什么意思?例如 AWS 提供服务“AWS Lambda”。作为 lambda 函数启动的进程有时间限制,之后 lambda 函数将停止执行并假定任务已完成但有错误。和示例任务 - 将数据发送到 http 端点。根据与 http 端点的网络连接或其他因素,发送数据的过程可能需要很长时间。如果我需要将相同的数据发送到许多端点,那么整个处理时间将需要one process timetimes endpoints amount。这增加了在所有数据发送到所有端点之前停止 lambda 函数的机会。为了解决这个问题,我需要使用线程以并行模式将数据发送到不同的端点。

线程的问题 - 启动的线程无法停止。如果 http 请求花费的时间超过了 lambda 函数时间限制所指定的时间,则 lambda 函数将被中止并返回错误。所以我需要对http请求使用超时来中止它,如果它花费的时间比预期的要多。

如果http请求将被超时取消或端点将返回错误,我需要将未处理的数据保存在某处以免丢失数据。可以预测保存未处理数据所需的时间,因为我控制了保存数据的存储空间。

最后一部分消耗时间过程或线程被调度的循环executor.submit()。如果只有一个端点或少数端点,则消耗的时间会很小。没有必要控制这一点。但是如果我处理了许多端点,我必须考虑到这一点。

所以基本上全职将包括:

有一个例子说明我如何使用线程来管理时间

import concurrent.futures
from functools import partial

import requests
import time

start = time.time()

def send_data(data):
    host = 'http://127.0.0.1:5000/endpoint'
    try:
        result = requests.post(host, json=data, timeout=(0.1, 0.5))
        # print('done')
        if result.status_code == 200:
            return {'status': 'ok'}
        if result.status_code != 200:
            return {'status': 'error', 'msg': result.text}
    except requests.exceptions.Timeout as err:
        return {'status': 'error', 'msg': 'timeout'}


def get_data(n):
    return {"wait": n}


def done_cb(a, b, future):
    pass # save unprocessed data


def main():
    executor = concurrent.futures.ThreadPoolExecutor()
    futures = []
    max_time = 0.5

    for i in range(1):
        future = executor.submit(send_data, *[{"wait": 10}])
        future.add_done_callback(partial(done_cb, 2, 3))
        futures.append(future)
        if time.time() - s_time > max_time:
            print('stopping creating new threads')
            # save unprocessed data
            break

    try:
        for item in concurrent.futures.as_completed(futures, timeout=1):
            item.result()
    except concurrent.futures.TimeoutError as err:
        pass

我在考虑如何使用asyncio库而不是线程来做同样的事情。

import asyncio
import time
from functools import partial

import requests

start = time.time()


def send_data(data):
    ...

def get_data(n):
    return {"wait": n}

def done_callback(a,b, future):
    pass # save unprocessed data

def main(loop):
    max_time = 0.5
    futures = []
    start_appending = time.time()
    for i in range(1):
        event_data = get_data(1)
        future = (loop.run_in_executor(None, send_data, event_data))
        future.add_done_callback(partial(done_callback, 2, 3))
        futures.append(future)

        if time.time() - s_time > max_time:
            print('stopping creating new futures')
            # save unprocessed data
            break


    finished, unfinished = loop.run_until_complete(
        asyncio.wait(futures, timeout=1)
    )


_loop = asyncio.get_event_loop()
result = main(_loop)

功能send_data()与之前的代码相同。因为请求库不是我run_in_executor()用来创建未来对象的异步代码。我遇到的主要问题是done_callback()当线程启动但执行器完成它的工作时没有执行。但只有当期货将被asyncio.wait()表达“处理”时。

基本上我正在寻找开始执行异步未来的方法,比如ThreadPoolExecutor开始执行线程,而不是等待asyncio.wait()表达式调用done_callback()。如果您有其他想法,如何编写可与线程或协程一起工作并在确定性时间内完成的 Python 代码。请分享它,我会很高兴阅读它们。

和其他问题。如果线程或未来完成了它的工作,它可以返回结果,我可以使用 in done_callback(),例如通过结果中返回的 id 从队列中删除消息。但是如果线程或未来被取消,我没有结果。而且我必须使用functools.partial()pass in done_callback 附加数据,这可以帮助我理解这个回调被调用的数据。如果传递的数据很小,这不是问题。如果数据很大,我需要将数据放入数组/列表/字典中并传入回调仅数组的索引或将“完整数据:在回调中。

done_callback()我能否以某种方式访问​​在取消的未来/线程上触发的传递给未来/线程的变量?

标签: pythonmultithreadingpython-asyncio

解决方案


您可以使用asyncio.wait_for来等待一个未来(或多个未来,当与 结合使用时asyncio.gather)并在超时的情况下取消它们。与线程不同,asyncio 支持取消,因此您可以随时取消任务,并且它将在它发出的第一个阻塞调用(通常是网络调用)时被取消。

请注意,要使其正常工作,您应该使用异步原生库,例如aiohttpHTTP。尝试requests与 asyncio using结合使用run_in_executor似乎适用于简单的任务,但它不会为您带来使用 asyncio 的好处,例如能够在不妨碍操作系统的情况下生成大量任务,或者取消的可能性。


推荐阅读