python - Discord cogs 无法加载
问题描述
我正在尝试使用使用 cogs 的单个 python 脚本运行两个不同的 Discord Bot。但是当我尝试运行第二个机器人时,它会抛出一个 ImportError,即使我没有使用那个特定的库。没有反垃圾邮件机器人,反应角色机器人可以正常工作。这是我的代码。仅供参考,我在虚拟环境中工作。
主文件
if __name__ == "__main__":
try:
reaction_role_bot = commands.Bot(command_prefix=config["reaction_role_bot"]["bot_prefix"], intents=discord.Intents.all())
reaction_slash = SlashCommand(reaction_role_bot, sync_commands=True)
reaction_role_bot.load_extension(f"cogs.{str(os.path.basename('cogs/reaction_roles.py')[:-3])}")
anti_spam_bot = commands.Bot(command_prefix=config["anti_spam_bot"]["bot_prefix"], intents=discord.Intents.default())
spam_slash = SlashCommand(anti_spam_bot, sync_commands=True)
anti_spam_bot.load_extension(f"cogs.{str(os.path.basename('cogs/anti_spam.py')[:-3])}")
event_loop = asyncio.get_event_loop()
event_loop.create_task(reaction_role_bot.run(config["reaction_role_bot"]["token"]))
event_loop.create_task(anti_spam_bot.run(config["anti_spam_bot"]["token"]))
event_loop.run_forever()
except Exception as e:
print(e)
反垃圾邮件.py
import platform
import os
import discord
from discord.ext import commands
from antispam import AntiSpamHandler
from antispam.plugins import AntiSpamTracker, Options
class AntiSpamBot(commands.Cog):
def __init__(self, client):
self.client = client
# Initialize the AntiSpamHandler
self.client.handler = AntiSpamHandler(self.client, options=Options(no_punish=True))
# 3 Being how many 'punishment requests' before is_spamming returns True
self.client.tracker = AntiSpamTracker(self.client.handler, 3)
self.client.handler.register_extension(self.client.tracker)
@commands.Cog.listener()
async def on_ready(self):
print("---------------------------------")
print(f"Logged in as {str(self.client.user)}")
print(f"Discord.py API version: {discord.__version__}")
print(f"Python version: {platform.python_version()}")
print(f"Running on: {platform.system()} {platform.release()} ({os.name})")
await self.client.change_presence(status=discord.Status.idle, activity=discord.Game(name="Head of Security"))
print("---------------------------------\n")
# The code in this event is executed every time a valid commands catches an error
@commands.Cog.listener()
async def on_command_error(context, error):
raise error
@commands.Cog.listener()
async def on_message(self, message):
await self.client.handler.propagate(message)
if self.client.tracker.is_spamming(message):
await message.delete()
await message.channel.send(f"{message.author.mention} has been automatically kicked for spamming.")
await message.author.kick()
await self.client.process_commands(message)
def setup(client):
client.add_cog(AntiSpamBot(client))
错误
Extension 'cogs.anti_spam' raised an error: ImportError: cannot import name 'AsyncMock' from 'unittest.mock' (/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/unittest/mock.py)
我没有使用齿轮的经验,这让我有点困惑。任何形式的帮助都会帮助我解决这个问题!提前致谢!
解决方案
我不认为这是一个 cog 注册问题。我相信这是您的 cog 文件中某些依赖项的导入错误。我搜索了您的错误并发现了类似的内容,我建议您在此处查看以获取更多信息。
作为一个笼统的声明,我会仔细检查您是否安装了 mock,并且您正在将它安装在您认为正在安装它的 Python 版本上。如果您安装了多个 python 版本,它可能会变得很不稳定。
另外,在不相关的说明中:最好避免在一个 python 文件中运行多个机器人实例,但我可以帮助您以最好的方式做到这一点。
对于初学者,您必须意识到这Client.run
是对几个更底层概念的抽象。
有Client.login
哪些登录客户端,然后Client.connect
哪些实际运行处理。这些是协程。
asyncio
提供将事物放入事件循环的能力,以便它在有时间的时候工作。
像这样的东西,例如
loop = asyncio.get_event_loop()
async def foo():
await asyncio.sleep(10)
loop.close()
loop.create_task(foo())
loop.run_forever()
如果我们想等待另一个协程发生某些事情,asyncio 也通过 asyncio.Event 的同步方式为我们提供了这个功能。您可以将其视为您正在等待的布尔值:
e = asyncio.Event()
loop = asyncio.get_event_loop()
async def foo():
await e.wait()
print('we are done waiting...')
loop.stop()
async def bar():
await asyncio.sleep(20)
e.set()
loop.create_task(bar())
loop.create_task(foo())
loop.run_forever() # foo will stop this event loop when 'e' is set to true
loop.close()
使用这个概念,我们可以将其应用于不和谐机器人本身。
import asyncio
import discord
from collections import namedtuple
# First, we must attach an event signalling when the bot has been
# closed to the client itself so we know when to fully close the event loop.
Entry = namedtuple('Entry', 'client event')
entries = [
Entry(client=discord.Client(), event=asyncio.Event()),
Entry(client=discord.Client(), event=asyncio.Event())
]
# Then, we should login to all our clients and wrap the connect call
# so it knows when to do the actual full closure
loop = asyncio.get_event_loop()
async def login():
for e in entries:
await e.client.login()
async def wrapped_connect(entry):
try:
await entry.client.connect()
except Exception as e:
await entry.client.close()
print('We got an exception: ', e.__class__.__name__, e)
entry.event.set()
# actually check if we should close the event loop:
async def check_close():
futures = [e.event.wait() for e in entries]
await asyncio.wait(futures)
# here is when we actually login
loop.run_until_complete(login())
# now we connect to every client
for entry in entries:
loop.create_task(wrapped_connect(entry))
# now we're waiting for all the clients to close
loop.run_until_complete(check_close())
# finally, we close the event loop
loop.close()
推荐阅读
- javascript - 如何从 Electron-builder NSIS 获取选定的语言
- ios - React Native应用程序iphone 11中的全屏背景图像
- sql - 表与物化视图
- ruby-on-rails - shopify_app 和 shopify_api 之间的功能差异
- blazor - 如何让两种方式的数据绑定在 blazor 中工作?
- datatables - 将导出按钮添加到数据编辑器
- arrays - 有效地在数组中找到一个元素,其中连续元素相差 +1/0/-1
- java - 何时在构造函数中使用静态而不是传递引用来共享我想要共享的东西?
- python - 如何从python中的动态下拉列表中提取/刮取选项值?
- c# - 为什么这两个 c# 脚本给出不同的结果?