python - 同时运行 asyncio bot 到 Tkinter GUI
问题描述
我正在尝试将 Python Twitch 机器人(https://github.com/TwitchIO/TwitchIO)集成到 Tkinter GUI 中。这是我从子类化的机器人twitch.io.ext.commands.bot
import os
from twitchio.ext import commands
from dotenv import load_dotenv
from os.path import join, dirname
class Bot(commands.Bot):
def __init__(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
dotenv_path = join(dir_path, '.env')
load_dotenv(dotenv_path)
TMI_TOKEN = os.environ.get('TMI_TOKEN')
CLIENT_ID = os.environ.get('CLIENT_ID')
BOT_NICK = os.environ.get('BOT_NICK')
BOT_PREFIX = os.environ.get('BOT_PREFIX')
CHANNEL = os.environ.get('CHANNEL')
super().__init__(irc_token=TMI_TOKEN,
client_id=CLIENT_ID,
nick=BOT_NICK,
prefix=BOT_PREFIX,
initial_channels=[CHANNEL])
async def start(self):
await self._ws._connect()
try:
await self._ws._listen()
except KeyboardInterrupt:
pass
finally:
self._ws.teardown()
async def stop(self):
self._ws.teardown()
async def event_ready(self):
print("ready")
async def event_message(self, message):
print(message.content)
await self.handle_commands(message)
@commands.command(name='test')
async def my_command(self, ctx):
await ctx.send(f'Hello {ctx.author.name}!')
这是 Tkinter GUI
try:
import Tkinter as Tk
except ModuleNotFoundError:
import tkinter as Tk
import asyncio
from bot import Bot
bot = Bot()
loop = asyncio.get_event_loop()
def on_start():
print("start")
loop.run_until_complete(bot.start())
def on_stop():
print("stop")
loop.run_until_complete(bot.stop())
if __name__ == '__main__':
root = Tk.Tk()
frame = Tk.Frame(root)
start_button = Tk.Button(frame.master, text="start", command = on_start)
start_button.pack()
stop_button = Tk.Button(frame.master, text="stop", command = on_stop)
stop_button.pack()
root.mainloop()
因此,当按下开始按钮时,机器人会自行启动并连接到 Twitch。但是,GUI 卡loop.run_until_complete(bot.start())
在on_start()
它永远不会返回之后。我如何能够将机器人同时运行到 GUI?
解决方案
您可以尝试在不同的线程中运行事件循环,并以线程安全的方式向其提交任务。例如:
loop = asyncio.get_event_loop()
threading.Thread(daemon=True, target=loop.run_forever).start()
# ... in the GUI:
def on_start():
print("start")
asyncio.run_coroutine_threadsafe(loop, bot.start())
def on_stop():
print("stop")
asyncio.run_coroutine_threadsafe(loop, bot.stop())
推荐阅读
- c# - 如何修复“系统不包含‘文本’的定义”错误
- sharepoint - 如何理解网站栏和图书馆栏之间爬取的区别
- excel - 如何更改此 for 循环的列引用
- javascript - messing.send 不是一个函数
- sql-server - 如何在存储过程中将测试数据表作为参数传递
- swift - 我的全局变量正在从一个函数到另一个函数失去它的价值
- python - Flask:如何在收到 POST 请求时自动渲染模板
- python - Python - 比较 2 个相同的字符串返回 'False'
- qt - ld 找不到“GLESv2”库
- r - 使用系统 _a lot_ 从 R 调用 Perl