python - 简单的过程 - 不能使用生成器发送方法?
问题描述
背景
据说 SimPy 进程是文档中的生成器,我希望它具有PEP 342中的send方法。
流程由简单的 Python生成器描述。您可以将它们称为流程函数或流程方法,具体取决于它是普通函数还是类的方法。在它们的生命周期中,它们创建事件并产生它们以等待它们被触发。
我可以使用send将消息从另一个发电机发送到发电机,例如将充电站发电机注入 ev 发电机。
from typing import Dict, Optional, Generator
import time
def ev(env) -> Generator:
while True:
next_charging_station = yield
print("next_charging_station[{}]".format(next_charging_station))
def dispathcer(env, ev) -> Generator:
while True:
time.sleep(3)
print('Dispatching the next charging station')
ev.send("new station")
process_ev = ev(env)
next(process_ev)
process_dispatcher = dispathcer(env, process_ev)
Dispatching the next charging station
next_charging_station[new station]
Dispatching the next charging station
next_charging_station[new station]
但是,SymPy 进程没有send方法。
import simpy
env = simpy.Environment()
def ev(env):
while True:
next_charging_station = yield
print("next_charging_station[{}]".format(next_charging_station))
def dispathcer(env, ev):
while True:
print('Dispatching the next charging station at %d' % (env.now))
ev.send("new station")
rocess_ev = env.process(ev(env))
process_dispatcher = env.process(dispathcer(env, process_ev))
-----
Dispatching the next charging station at 0
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-12-ece25f162510> in <module>
1 process_ev = env.process(ev(env))
----> 2 process_dispatcher = env.process(dispathcer(env, process_ev))
<ipython-input-11-b89aeb1a9073> in dispathcer(env, ev)
11 while True:
12 print('Dispatching the next charging station at %d' % (env.now))
---> 13 ev.send("hoge")
AttributeError: 'Process' object has no attribute 'send'
问题
如果这是按设计的(发送和可能关闭/抛出不可用)或者我误解了某些内容,请帮助了解此错误的原因。我想所有的同步都需要由 SimPy 环境通过其事件来完成,否则环境可能会丢失进程状态的跟踪,但不确定。
请告知是否有办法从另一个进程更新内存。例如,如何更改代码以将充电站进程动态注入 ev 进程。
import simpy
env = simpy.Environment()
def charging_station(env):
print('Start charging at %d' % (env.now))
charge_duration = 3
yield env.timeout(charge_duration)
return
def ev(env):
while True:
print('Start driving at %d' % (env.now))
driving_duration = 5
yield env.timeout(driving_duration)
print('Stop at a charging station at %d' % (env.now))
# [Q] Instead of creating a charging process inside the ev process,
# how to get it injected?
yield env.process(charging_station(env)) # <-----
env.process(ev(env))
env.run(until=20)
-----
Start driving at 0
Stop at a charging station at 5
Start charging at 5
Start driving at 8
Stop at a charging station at 13
Start charging at 13
Start driving at 16
如果可能的话,例如这样的东西。
def new_ev(env): # <---- how to implement?
while True:
next_charging_station = yield # Get the next station process
print('Start charging at %d' % (env.now))
yield next_charging_station
print('Start driving at %d' % (env.now))
driving_duration = 5
yield env.timeout(driving_duration)
它看起来只有在创建过程中才能注入进程。因此,一种方法可以将调度程序注入 ev 进程,然后从 ev 进程内部获取充电站。但是,希望将任何实例显式注入其中,而不是从其中提取。
import random
import simpy
env = simpy.Environment()
class Dispatcher(object):
env = None
def __init__(self, env):
self.env = env
print('Start dispatcher at time %d' % self.env.now)
# Start the run process everytime an instance is created.
#self.process = env.process(self.run())
def run(self):
while True:
print('Start dispatcher at time %d' % self.env.now)
yield self.env.timeout(1)
def next_charger_station(self):
return self._charge(random.randint(1, 1000), self.env)
def _charge(self, id, env):
print('station %s start charging at time %d' % (id, env.now))
yield env.timeout(5)
return
def ev(env, dispatcher):
while True:
print('EV stops at the new charging station at time %d' % (env.now))
yield env.process(dispatcher.next_charger_station())
print('EV start driving from the charging station at time %d' % (env.now))
yield env.timeout(5)
dispatcher = Dispatcher(env)
process_ev = env.process(ev(env, dispatcher))
env.run(until=30)
解决方案
推荐阅读
- android - 房间线程,RxJava,选择问题
- python-3.x - 有没有办法在张量流中施加约束,我可以在此过程中强制执行一些规则吗?
- sql-server - 无法从同一主机上的 Web 应用程序 docker 访问 docker 托管的 SQL 服务器
- android - Android 密码接收器停止工作?
- python - plt.pcolormesh() 函数如何在 python 中工作
- robotframework - Robot Framework ImapLibrary 删除所有电子邮件不工作
- c# - 工厂模式的含义
- ios - 活动指示器动态改变颜色ios swift
- android - webview 视频事件“onShowCustomView”正在全屏触发,但是当我返回小屏幕时,没有监听器工作
- rsync - Rsync 排除正确的语法