首页 > 解决方案 > Pytransitions:异步相关回调的 AsyncMachine 顺序解析

问题描述

注意: 这个问题与 Python 的 FSM 库pytransitions 有关

我正在寻找一种方法来按顺序解决方法回调,当它们在prepare or/and before or/and after中作为列表被提及时。我正在使用AsyncMachine来自的模块transitions.extensions.asyncio

预期结果:

1Done_2Done_3Done

得到:

None_3Done

复制当前情况的示例代码:

import asyncio
from transitions.extensions.asyncio import AsyncMachine


class Model:

    STATES = ['A', 'B']
    TRANSITIONS = [
        {'trigger': 'next', 'source': 'A', 'dest': 'B',
            'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
    ]

    def __init__(self, name, state='initial'):
        self.name = name
        self.state = state
        self.attribute_1 = None
        self.attribute_2 = None
        self.attribute_3 = None

    async def initialize1(self):
        await asyncio.sleep(1)  # This is expensive operation and will take some time.
        self.attribute_1 = '1Done'
        print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)

    async def initialize2(self):
        await asyncio.sleep(0.5)  # This is expensive operation and will take some time.
        self.attribute_2 = f'{self.attribute_1}_2Done'
        print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)

    async def initialize3(self):
        self.attribute_3 = f'{self.attribute_2}_3Done'
        print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)

    async def show_attributes(self):
        print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')


machine = AsyncMachine(
    model=None,
    states=Model.STATES,
    transitions=Model.TRANSITIONS,
    initial=None,
    queued='model'
    # queued=True
)


async def main():
    model1 = Model(name='Model1', state='A')
    machine.add_model(model1, initial=model1.state)
    await machine.dispatch('next')


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

正如代码中所示,'prepare': ['initialize1', 'initialize2', 'initialize3']我正在寻找一种方法,一旦解决了 initialize1 就调用 initialize2 ,一旦解决了 initialize1 和 initialize2 方法,就调用 initialize3 。目前,它们被并行调用,这是一个很好的特性,但如果有一种方法可以让它们按顺序执行/解析,那就太棒了。

当然,我可以再添加一个方法initialize_all,然后在其中调用上述所有方法。但是想想我要不断添加多少新方法来处理现实世界的问题。我想让我的函数可重用且更小,仅用于特定任务。

标签: pytransitions

解决方案


我浏览了pytransitions源代码,发现了两种方法来实现我正在寻找的功能

如果我提到我是如何实现我正在寻找的功能的,我认为会很好。

由于我正在寻找一种方法来异步解析回调事件(默认情况下)和按要求顺序解析,因此我不得不重写callbacks.AsyncMachine

方法一:

import asyncio
from functools import partial
from transitions.extensions.asyncio import AsyncMachine


class EnhancedMachine(AsyncMachine):

    async def callbacks(self, funcs, event_data):
        """ Overriding callbacks method:
            Get `parallel_callback` keyword argument to decide whether
            callback events should be resolved in parallel or in sequence.
        """
        parallel_callback = event_data.kwargs.get('parallel_callback', None)
        resolved_funcs = [partial(event_data.machine.callback, func, event_data) for func in funcs]
        if parallel_callback is False:
            for func in resolved_funcs:
                await func()
        else:
            await self.await_all(resolved_funcs)


class Model:

    STATES = ['A', 'B']
    TRANSITIONS = [
        {'trigger': 'next', 'source': 'A', 'dest': 'B',
            'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}

    ]

    def __init__(self, name, state='initial'):
        self.name = name
        self.state = state
        self.sequential_transition = True
        self.attribute_1 = None
        self.attribute_2 = None
        self.attribute_3 = None

    async def initialize1(self, ed):
        await asyncio.sleep(1)  # This is expensive operation and will take some time.
        self.attribute_1 = '1Done'
        print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)

    async def initialize2(self, ed):
        await asyncio.sleep(0.5)  # This is expensive operation and will take some time.
        self.attribute_2 = f'{self.attribute_1}_2Done'
        print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)

    async def initialize3(self, ed):
        self.attribute_3 = f'{self.attribute_2}_3Done'
        print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)

    async def show_attributes(self, ed):
        print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')


machine = EnhancedMachine(
    model=None,
    states=Model.STATES,
    transitions=Model.TRANSITIONS,
    initial=None,
    send_event=True,  # this will pass EventData instance for each method.
    queued='model'
    # queued=True
)


async def main():
    model1 = Model(name='Model1', state='A')
    machine.add_model(model1, initial=model1.state)

    # Passing `parallel_callback` as False for synchronous events
    await machine.dispatch('next', parallel_callback=False)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

缺点:

  1. send_event=True添加了所有方法定义,并添加了额外的参数ed(event_data)来处理parallel_callback关键字参数。

  2. 转换回调需要传递parallel_callback=False并且必须更改代码中所有可能的位置。

  3. 如果必须根据转换本身的定义来决定下一个转换,则parallel_callback无法传递关键字参数(至少我不确定如何执行此操作):

    TRANSITIONS = [
        {'trigger': 'next', 'source': 'A', 'dest': 'B',
            'prepare': [], 'before': [], 'after': ['next2']},
        {'trigger': 'next2', 'source': 'B', 'dest': 'C',
         'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
    ]
    

方法2(我个人更喜欢这种方式):

在转换的定义中,将相互依赖且应按顺序解决的回调组合在一起。

使用这种方法,最终的过渡看起来像这样

TRANSITIONS = [
    {'trigger': 'next', 'source': 'A', 'dest': 'B',
     'prepare': [('initialize1', 'initialize2', 'initialize3')], 'before': [],
     'after': ['show_attributes']}
]

解释:

'prepare': [('callback1', 'callback2'), 'callback3']

这里group1(callback1和callback2),group2(callback3)会异步(并行)解析。但是 group1 中的 callback1 和 callback2 将同步(按顺序)解决。

被覆盖callbacks的方法现在看起来会略有不同,还有一个新的静态方法await_sequential

class EnhancedMachine(AsyncMachine):

    async def callbacks(self, func_groups, event_data):
        """ Triggers a list of callbacks """
        resolved_func_groups = []
        for funcs in func_groups:
            if isinstance(funcs, (list, tuple)):
                resolved_funcs = [partial(event_data.machine.callback, func, event_data) for func in funcs]
            else:
                resolved_funcs = [partial(event_data.machine.callback, funcs, event_data)]
            resolved_func_groups.append(resolved_funcs)

        # await asyncio.gather(*[self.await_sequential(funcs) for funcs in resolved_func_groups])
        await self.await_all([partial(self.await_sequential, funcs) for funcs in resolved_func_groups])

    @staticmethod
    async def await_sequential(funcs):
        return [await func() for func in funcs]

缺点:

  1. 方法的定义和方法调用没有改变。
  2. 改变了一个地方,它修复了所有的地方。

缺点:

  1. 你应该知道你的方法在做什么。有时,不需要的分组会导致事件解决的不必要延迟。

使用这两种方式,我得到了相同的期望输出:

Model1 A -> Initialized1:  1Done
Model1 A -> Initialized2:  1Done_2Done
Model1 A -> Initialized3:  1Done_2Done_3Done
Model1 B -> Showing all: 1Done_2Done_3Done

我坚持使用第二种方法,尽管我很高兴知道实现此类功能的其他有效方法:)


推荐阅读