python - 如何使用 Django 通道对 AsyncConsumer 进行多线程处理
问题描述
我已经使用 Django Channels 一个星期了,runworker
并行性让我有些烦恼。
例如,我有这个 MQTT 客户端,它在收到消息时在通道中发布,基本。
async def treat_message(msg):
channel_layer = get_channel_layer()
payload = json.loads(msg.payload, encoding="utf-8")
await channel_layer.send("mqtt", {
"type": "value.change",
"message": payload
})
这个送的很好。我可以发送我想要的多少,它将被发送到redis队列。到频道mqtt
。
然后我运行worker,它将重定向队列中的消息mqtt
:
python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']
这就是问题的开始。这是读取数据的 AsyncConsumer 的内容:
class MQTTConsumer(AsyncConsumer):
async def value_change(self, event):
await asyncio.sleep(5)
print("I received changes : {}".format(event["message"]))
我为了模拟业务的任务放了一个睡眠。这就是我要去的地方:异步消费者不是多线程的!当我向通道发送两条消息时,消费者需要 10 秒来处理第二条消息,而不是多线程的 5 秒。如下所示。
2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}
任何有关该主题的情报都会有很大帮助,在此先感谢!
编辑:我发现管理它的唯一方法是创建一个执行器,其中包含工作人员来异步执行它。但我不确定它的部署效率
def handle_mqtt(event):
time.sleep(3)
logger.info("I received changes : {}".format(event["message"]))
class MQTTConsumer(AsyncConsumer):
def __init__(self, scope):
super().__init__(scope)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
async def value_change(self, event):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(self.executor, handle_mqtt, event)
解决方案
这是目前的设计
是的,这是预期的设计,因为它是最安全的方式(如果你不知道它可以防止竞争条件)。如果您乐于并行运行消息,只需在需要时(使用
asyncio.create_task
)分离您自己的协程,确保清理它们并在关闭时等待它们。这是相当多的开销,所以希望我们将来会为消费者提供一种选择加入模式,但现在我们提供的只是安全选项。
推荐阅读
- c++ - 间接需要指针操作数错误
- c++ - 在 C++ 中不使用数组或最小值/最大值计算测试成绩平均值
- aem - 仅当存在时才在 Sightly/HTL 中添加属性 (AEM)
- deep-learning - 为什么小权重有助于深度神经网络(正则化)
- javascript - 条卡元素输入不显示
- python - 使用 Python ctypes 调用 rs232.c 时如何解决分段错误问题?
- c - 有人可以帮我解决那个C问题吗?当我尝试运行它不返回任何东西
- python-3.x - 如何使用 GTK PrintOperation 打印图像
- python - matplotlib 多个 xticklabel 用于两个 colluns 的条形图
- node.js - PM2 & Puppeteer 手表重启