python - 在 Discord.py 上使用 MongoDB 执行计划任务
问题描述
目的
我有一些命令,例如 temp-mute、temp-ban 和其他需要在命令执行后执行的命令,而且我确实需要安排诸如赠品之类的事情,并在订阅结束后立即从命令中触发功能。
我想要什么?
我想将我所有的时间和事情存储在 MongoDB 中,然后在需要触发该功能时触发它。我目前使用await asyncio.sleep(seconds)
,但是当我重新启动机器人或机器人脱机时会停止,我希望该功能在时间过去后立即触发,或者我希望即使在机器人重新启动后也能按时触发.
解决方案
您可以使用@tasks.loop()
.
from pymongo import MongoClient
cluster = MongoClient("mongo_url_here")
collection = cluster["name"]
这可能是您描述该系列的方式。现在,当您临时禁止或临时静音某人时,您需要保存最后一次。你可以这样做,
current_time = datetime.datetime.now()
final_time = current_time + datetime.timedelta(seconds=seconds_here)
然后保存final_time
在数据库中。
现在你需要创建一个@tasks.loop(seconds=x)
x 表示“每隔 x 秒,内部的函数@tasks.loop()
将运行
@tasks.loop(seconds=10)
async def checker(self):
try:
all = collection.find({}) # return all documents inside the db
current_time = datetime.datetime.now()
async for x in all:
if current >= x["Time"]: # do stuff after this
else:
pass
except Exception:
pass
x["Time"]
-> 这可以是文档中的一个变量,用于存储final_time
{"id" : id , "Time" : final_time} -> 像这样
现在,
async def checker(self):
-> Checker 是一个函数,你需要启动它。这取决于您是否使用cog
如果不是 -> checker.start()
# 在代码中的任何地方开始
如果是cog -> self.checker.start()
# 应该放在` init里面
推荐阅读
- arrays - 为什么 Ruby 代码显示数组而不是输入的十六进制代码?
- html - 如何在单个html文件中在django3中添加多个css文件?
- node.js - 如何解决 npm 错误“npm ERR!代码生命周期”
- 3d - 3D 中 LibGDX 附近的矩形呈现块状/错误
- linux - ldd & 共享对象错误 Linux - 如何排除故障?
- python - 如何在带有opencv4的Google colab中使用SIFT和SURF算法?
- ansible - 无论如何,Ansible 变量跳过
- ruby-on-rails - 运行命令rails服务器时无法访问mac上的本地rails主机
- php - Laravel - 文件上传按钮在实时服务器中不起作用
- postgresql - docker-compose:致命:用户“postgres”的密码验证失败