首页 > 解决方案 > 'promisify' (async/await) 在 Python 中使用 pymsteams 向 Microsoft Teams 发送消息

问题描述

我正在使用该logging模块在我的 Python 脚本中向 MS Teams 发送大量消息。不幸的是,这很慢,所以我想在消息中添加异步/等待功能。

这是我的记录器模块(有些简化):

from logging import StreamHandler
import pymsteams

class TeamsHandler(StreamHandler):

    def __init__(self, channel_url):
        StreamHandler.__init__(self)
        self.channel_url = channel_url
        self.client = pymsteams.connectorcard(self.channel_url)

    def emit(self, record):
        msg = self.format(record)
        self.client.text(msg)
        try:
            self.client.send()
        except:
            print(f"{msg} could not be sent to Teams")

然后您将在常规脚本中使用它:

import logging
from TeamsHandler import TeamsHandler #the module above

my_logger = logging.getLogger('TestLogging')
my_logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
my_logger.addHandler(console_handler)

CHANNEL_ID = "https://outlook.office.com/webhook/your-big-number"
teamshandler = TeamsHandler(CHANNEL_ID)
teamshandler.setFormatter(logging.Formatter('%(levelname)s %(message)s'))
teamshandler.setLevel(logging.DEBUG)
my_logger.addHandler(teamshandler)
for i in range(1,100):
    my_logger.error(f"this is an error [{i}]")
    my_logger.info(f"this is an info [{i}]")
do_something_else()

我怎样才能使它do_something_else立即(几乎)执行,而不必等待 200 条消息进入 Teams ?

我尝试在模块中添加一些关键字,async但没有成功,所以我没有将它们放在问题中。如果不是完整的解决方案,很高兴得到一些指示。awaitTeamsHandler

理想情况下,随着脚本的进行,消息的顺序应该保持不变。

标签: pythonpython-3.xasync-awaitpython-asynciomicrosoft-teams

解决方案


如果pymsteams不支持 async/await,那么添加async到您的函数将不会真正帮助您,因为您最终仍会从pymsteams. 即使它确实支持异步/等待,它仍然无法工作,因为您是从 Python 日志记录 API 内部调用它们,而它本身不是异步的。async/await 不能神奇地将同步代码转换为 async,程序必须全面使用 async/await。

但是,如果您需要在后台运行某些东西的异步执行,您可以使用线程来代替。例如,为日志创建一个专用线程,例如:

class TeamsHandler(StreamHandler):
    def __init__(self, channel_url):
        super().__init__()
        self.channel_url = channel_url
        self.client = pymsteams.connectorcard(self.channel_url)
        self.queue = queue.Queue()
        self.thread = threading.Thread(target=self._worker)
        self.thread.start()
        # shutdown the worker at process exit
        atexit.register(self.queue.put, None)

    def _worker(self):
        while True:
            record = self.queue.get()
            if record is None:
                break
            msg = self.format(record)
            self.client.text(msg)
            try:
                self.client.send()
            except:
                print(f"{msg} could not be sent to Teams")

    def emit(self, record):
        # enqueue the record to log and return control to the caller
        self.queue.put(record)

当然,这有一个缺点,即如果日志记录后端出现问题,您的程序可能会比您在日志中看到的内容提前很多 - 但是当您删除日志记录和执行之间的同步时,情况总是如此。


推荐阅读