首页 > 解决方案 > 如何线程化 mqtt 订阅和 socket.io 服务器

问题描述

我尝试设置一个启动 mqtt 订阅或发布的 python 应用程序,同时我启动一个 Socket.io 服务器。我的第一次尝试是线程化这两个任务 - mqtt 函数没有问题,但我没有解决方案在线程中启动 Socket.io 服务器。这是我的一些代码:

mqtt_sub.py

import paho.mqtt.client as mqtt
import test.constants as constants

MQTT_PATH = "sensor/water/level"

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe(MQTT_PATH)

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

def start_mqtt_subs():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(constants.MQTT_SERVER, constants.MQTT_SERVER_PORT, constants.MQTT_SERVER_KEEPALIVE)
    client.loop_forever()

套接字.py

from aiohttp import web
import socketio

sio = socketio.AsyncServer(async_mode='aiohttp')
app = web.Application()
sio.attach(app)

async def index(request):
    with open('index.html') as f:
        return web.Response(text=f.read(), content_type='text/html')

@sio.on('test')
async def print_message(sid, message):
    print("Socket ID: " , sid)
    print(message)

app.router.add_get('/', index)

def start_socket_server():
    web.run_app(app, host='0.0.0.0', port=1986)

主文件

from test.socket import start_socket_server
from test.mqtt_sub import start_mqtt_subs
import threading

def main():
    t1 = threading.Thread(target=start_mqtt_subs)
    t2 = threading.Thread(target=start_socket_server)
    t1.start()
    t2.start()

if __name__ == '__main__':
   main()

结果是运行时错误。

RuntimeError: There is no current event loop in thread 'Thread-2'.

我试图让 socketio 在处理程序中启动,但没有任何反应。我的问题还有其他可能的解决方案吗?提前致谢!

标签: pythonmultithreadingsocket.iomqttaiohttp

解决方案


推荐阅读