pytransitions - Pytransitions:异步相关回调的 AsyncMachine 顺序解析
问题描述
注意: 这个问题与 Python 的 FSM 库pytransitions 有关
我正在寻找一种方法来按顺序解决方法回调,当它们在prepare or/and before or/and after中作为列表被提及时。我正在使用AsyncMachine
来自的模块transitions.extensions.asyncio
预期结果:
1Done_2Done_3Done
得到:
None_3Done
复制当前情况的示例代码:
import asyncio
from transitions.extensions.asyncio import AsyncMachine
class Model:
STATES = ['A', 'B']
TRANSITIONS = [
{'trigger': 'next', 'source': 'A', 'dest': 'B',
'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
]
def __init__(self, name, state='initial'):
self.name = name
self.state = state
self.attribute_1 = None
self.attribute_2 = None
self.attribute_3 = None
async def initialize1(self):
await asyncio.sleep(1) # This is expensive operation and will take some time.
self.attribute_1 = '1Done'
print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)
async def initialize2(self):
await asyncio.sleep(0.5) # This is expensive operation and will take some time.
self.attribute_2 = f'{self.attribute_1}_2Done'
print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)
async def initialize3(self):
self.attribute_3 = f'{self.attribute_2}_3Done'
print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)
async def show_attributes(self):
print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')
machine = AsyncMachine(
model=None,
states=Model.STATES,
transitions=Model.TRANSITIONS,
initial=None,
queued='model'
# queued=True
)
async def main():
model1 = Model(name='Model1', state='A')
machine.add_model(model1, initial=model1.state)
await machine.dispatch('next')
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
正如代码中所示,'prepare': ['initialize1', 'initialize2', 'initialize3']
我正在寻找一种方法,一旦解决了 initialize1 就调用 initialize2 ,一旦解决了 initialize1 和 initialize2 方法,就调用 initialize3 。目前,它们被并行调用,这是一个很好的特性,但如果有一种方法可以让它们按顺序执行/解析,那就太棒了。
当然,我可以再添加一个方法initialize_all
,然后在其中调用上述所有方法。但是想想我要不断添加多少新方法来处理现实世界的问题。我想让我的函数可重用且更小,仅用于特定任务。
解决方案
我浏览了pytransitions源代码,发现了两种方法来实现我正在寻找的功能。
如果我提到我是如何实现我正在寻找的功能的,我认为会很好。
由于我正在寻找一种方法来异步解析回调事件(默认情况下)和按要求顺序解析,因此我不得不重写callbacks
.AsyncMachine
方法一:
import asyncio
from functools import partial
from transitions.extensions.asyncio import AsyncMachine
class EnhancedMachine(AsyncMachine):
async def callbacks(self, funcs, event_data):
""" Overriding callbacks method:
Get `parallel_callback` keyword argument to decide whether
callback events should be resolved in parallel or in sequence.
"""
parallel_callback = event_data.kwargs.get('parallel_callback', None)
resolved_funcs = [partial(event_data.machine.callback, func, event_data) for func in funcs]
if parallel_callback is False:
for func in resolved_funcs:
await func()
else:
await self.await_all(resolved_funcs)
class Model:
STATES = ['A', 'B']
TRANSITIONS = [
{'trigger': 'next', 'source': 'A', 'dest': 'B',
'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
]
def __init__(self, name, state='initial'):
self.name = name
self.state = state
self.sequential_transition = True
self.attribute_1 = None
self.attribute_2 = None
self.attribute_3 = None
async def initialize1(self, ed):
await asyncio.sleep(1) # This is expensive operation and will take some time.
self.attribute_1 = '1Done'
print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)
async def initialize2(self, ed):
await asyncio.sleep(0.5) # This is expensive operation and will take some time.
self.attribute_2 = f'{self.attribute_1}_2Done'
print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)
async def initialize3(self, ed):
self.attribute_3 = f'{self.attribute_2}_3Done'
print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)
async def show_attributes(self, ed):
print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')
machine = EnhancedMachine(
model=None,
states=Model.STATES,
transitions=Model.TRANSITIONS,
initial=None,
send_event=True, # this will pass EventData instance for each method.
queued='model'
# queued=True
)
async def main():
model1 = Model(name='Model1', state='A')
machine.add_model(model1, initial=model1.state)
# Passing `parallel_callback` as False for synchronous events
await machine.dispatch('next', parallel_callback=False)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
缺点:
send_event=True
添加了所有方法定义,并添加了额外的参数ed
(event_data)来处理parallel_callback
关键字参数。转换回调需要传递
parallel_callback=False
并且必须更改代码中所有可能的位置。如果必须根据转换本身的定义来决定下一个转换,则
parallel_callback
无法传递关键字参数(至少我不确定如何执行此操作):TRANSITIONS = [ {'trigger': 'next', 'source': 'A', 'dest': 'B', 'prepare': [], 'before': [], 'after': ['next2']}, {'trigger': 'next2', 'source': 'B', 'dest': 'C', 'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']} ]
方法2(我个人更喜欢这种方式):
在转换的定义中,将相互依赖且应按顺序解决的回调组合在一起。
使用这种方法,最终的过渡看起来像这样:
TRANSITIONS = [
{'trigger': 'next', 'source': 'A', 'dest': 'B',
'prepare': [('initialize1', 'initialize2', 'initialize3')], 'before': [],
'after': ['show_attributes']}
]
解释:
'prepare': [('callback1', 'callback2'), 'callback3']
这里group1(callback1和callback2),group2(callback3)会异步(并行)解析。但是 group1 中的 callback1 和 callback2 将同步(按顺序)解决。
被覆盖callbacks
的方法现在看起来会略有不同,还有一个新的静态方法await_sequential
:
class EnhancedMachine(AsyncMachine):
async def callbacks(self, func_groups, event_data):
""" Triggers a list of callbacks """
resolved_func_groups = []
for funcs in func_groups:
if isinstance(funcs, (list, tuple)):
resolved_funcs = [partial(event_data.machine.callback, func, event_data) for func in funcs]
else:
resolved_funcs = [partial(event_data.machine.callback, funcs, event_data)]
resolved_func_groups.append(resolved_funcs)
# await asyncio.gather(*[self.await_sequential(funcs) for funcs in resolved_func_groups])
await self.await_all([partial(self.await_sequential, funcs) for funcs in resolved_func_groups])
@staticmethod
async def await_sequential(funcs):
return [await func() for func in funcs]
缺点:
- 方法的定义和方法调用没有改变。
- 改变了一个地方,它修复了所有的地方。
缺点:
- 你应该知道你的方法在做什么。有时,不需要的分组会导致事件解决的不必要延迟。
使用这两种方式,我得到了相同的期望输出:
Model1 A -> Initialized1: 1Done
Model1 A -> Initialized2: 1Done_2Done
Model1 A -> Initialized3: 1Done_2Done_3Done
Model1 B -> Showing all: 1Done_2Done_3Done
我坚持使用第二种方法,尽管我很高兴知道实现此类功能的其他有效方法:)
推荐阅读
- flutter - 构建函数返回 null,有问题的小部件是:BlocBuilder
在 flutter_bloc 包中 - flutter - 在 Flutter 中,我想通过按下按钮获取当前日期并在文本字段中显示,我该怎么做?
- reactjs - 无法在 JavaScript 中将 ObjectId 转换为字符串
- python-3.x - (Python 3) 根据对象的大小将对象映射到图像中的颜色
- firebase - 您可以向 Firebase 项目中的用户授予全面的公共访问权限吗?
- php - 从文件 CSV PHP 中删除数组中的字符
- android - 使用 firebaseui 登录 Facebook 不起作用 errorCode: 100, subErrorCode: 33, errorType: GraphMethodException
- python - Django 使用旧目录填充新项目
- javascript - 如何使用 Puppeteer 检查元素是否存在于异步/等待中
- javascript - 尝试创建 instagram 机器人,但不断收到语法错误?