首页 > 解决方案 > Python try_decorator 重试 i2c 连接

问题描述

我正在通过 I2C 总线与某些硬件通信。

每隔一段时间,就会出现一个异常,因为总线很忙,所以我目前正在用一个重试的循环来处理这个问题,但是有很多代码重复,我想整理一下。

所以,我尝试编写一个装饰器函数来将各种函数传递给

例如,此命令初始化总线: i2c = busio.I2C(board.SCL, board.SDA)

这是我写的装饰器:

def try_decorator(func, name):
    MAX_TRIES = 5
    i = 1
    n = name
    while True:
        try:
            func()
        except Exception as e:
            print(f'retry {i} for {n}')
            i += 1
            if i == MAX_TRIES:
                print(f'max retries of {MAX_TRIES} reached for {n}, with error: {e}')
                break
            continue
        break

然后我尝试将上面提到的行作为部分传递: i2c = try_decorator(partial(busio.I2C, board.SCL, board.SDA), 'i2c init')但是当我尝试将 i2c 传递给下一个函数时出现异常:'NoneType' object has no attribute 'try_lock'我认为这意味着部分实际上并没有正确初始化总线。我也尝试过作为 lambda 和函数,但得到了类似的结果。

我能做些什么来解决这个问题并使我的代码尽可能简单?

此外,我没有将字符串作为namearg 传递(以跟踪异常),而是尝试使用 func。当我尝试作为函数而不是部分函数时,装饰器中的名称,但它给出了一个错误,似乎它正在尝试获取 I2C 对象的名称。

编辑:我已经按照 BigBro 对下面的回答更新了我的代码,但是在将 i2c 对象传递给我的下一个函数时仍然出现错误(如果我在没有装饰器的情况下运行,我不会收到此错误)。

def retry(f):
    MAX_TRIES = 5
    def wrapped(*args, **kwargs):
        for i in range(MAX_TRIES):
            try:
                res = f(*args, **kwargs)
                return res
            except IOError as e:
                print(f'retry {i} for {f.__name__}, with error: {e}')
        print(f'max retries of {MAX_TRIES} reached for {f.__name__}, with error: {e}')
    return wrapped

i2c = retry(busio.I2C(board.SCL, board.SDA))

drv = adafruit_drv2605.DRV2605(i2c)

'function' object has no attribute 'try_lock'

标签: pythondecorator

解决方案


简单的答案是您忘记在try_decorator(因此返回None)中返回函数的结果。func()最后用res = func()and替换return res

但是,您的函数并不是真正的装饰器,因为它实际上并没有返回函数,并且不能用作@try_decorator.

我会建议类似:

import random

MAX_RETRIES = 10
def retry_decorator(f):
    def wrapped(*args, **kwargs):  # Allows arg to be passed to the function instead of using partial
        for i in range(MAX_RETRIES):
            try:
                res = f(*args, **kwargs)
                return res
            except Exception as e:
                print(f'Function {f.__name__} failed {i}/{MAX_RETRIES}: {e}')  # f.__name__ is what you're looking for
        print(f'Function {f.__name__} didn\'t work after {MAX_RETRIES} retries: {e}')
    return wrapped


@retry_decorator
def fail_randomly():
    i = random.randint(0, 10)
    if i < 7:
        raise RuntimeError('Random error')
    print('Sucess!')

fail_randomly()

输出

Function fail_randomly failed 0/10: Random error
Function fail_randomly failed 1/10: Random error
Function fail_randomly failed 2/10: Random error
Function fail_randomly failed 3/10: Random error
Function fail_randomly failed 4/10: Random error
Sucess!

最后一件事,我建议更具体地说明你捕获的异常,否则事情可能会变得难以调试;)


推荐阅读