首页 > 解决方案 > 如何为在键盘中断之前运行并且不返回任何内容的类编写单元测试?

问题描述

我正在开发一个监视文件系统中目录的 Python 服务。当它看到一个文件已被创建或移动到那里时,它会将文件的路径发送到 Kafka 队列中。我的服务完全按照我的需要工作,但我的问题是我应该有至少 90% 的单元测试覆盖率。我对 Python 比较陌生,而且我以前从未在任何语言中使用过单元测试,所以我觉得我的深度不够。我只是想不通我将如何测试这些类。

这是监视文件系统的类,我正在使用看门狗库。

我将handler=FileHandler参数添加到init是因为我认为我可以使用它来向类传递一个我可以用于测试的假处理程序,但这感觉它是不必要的复杂。

class FileSystemMonitor:

    def __init__(self, target_path, kafka_queue, handler=FileHandler):
        self.path = target_path
        self.queue = kafka_queue
        self.handler = handler(self.queue)

    def start(self):
        observer = Observer()
        observer.schedule(self.handler, self.path, recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

def parse_args():
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    queue = sys.argv[2] if len(sys.argv) > 2 else 'default'
    return path, queue

if __name__ == "__main__":
    path, queue = parse_args()
    monitor = FileSystemMonitor(path, queue)
    monitor.start()

这是我创建的类,它处理监视器抛出的事件,并将路径传递给 Kafka 队列。

class FileHandler(PatternMatchingEventHandler):

    def __init__(self, queue):
        super(FileHandler, self).__init__(ignore_patterns=["*/.DS_Store"], ignore_directories=True)
        self.queue = queue

    def on_any_event(self, event):
        super(FileHandler, self).on_any_event(event)
        #print(event, self.queue)
        result = kafkaProducer.send_msg(self.queue, event.src_path, event.event_type)
        print("Handler:", result)
        return result

我已经为 kafkaProducer 类编写了一些测试,我对此并不感到困难,因为它实际上返回了一个我可以测试的值。

FileSystemMonitor 无限运行,只是等待键盘中断,当它结束时,它不会返回任何东西,那么我该如何为它编写单元测试呢?

至于 FileHandler 类,它取决于监视器类触发的事件,那么我将如何隔离 Handler 类来测试它呢?

标签: python-3.xmultithreadingunit-testingpython-unittestpython-watchdog

解决方案


FileSystemMonitor.start很难测试,因为它会阻塞直到发生外部事件,但是由于阻塞,测试不能轻易地使事件发生。我想你可以用多线程或多处理或者只是一个计时器来做一些技巧,但这会给你的测试增加一些我不喜欢的不确定性。

更明确的方法是允许调用者指定循环内发生的情况,while以便可以在测试中引发异常,同时time.sleep在生产代码中调用。

class FileSystemMonitor:
    def __init__(self, target_path, kafka_queue, handler=FileHandler):
        self.path = target_path
        self.queue = kafka_queue
        self.handler = handler(self.queue)

    def start(self, loop_action):
        observer = Observer()
        observer.schedule(self.handler, self.path, recursive=True)
        observer.start()
        try:
            while True:
                loop_action()
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

这就是您的测试的样子:

def fake_loop_action():
    raise KeyboardInterrupt

def test_FileSystemMonitor():
    # Initialize target_path, kafka_queue and handler here.
    # You might want to use test doubles.
    monitor = FileSystemMonitor(target_path, kafka_queue, handler)
    monitor.start(loop_action=fake_loop_action)

而在生产代码中,您将time.sleep改为使用。您甚至可以现在指定呼叫延迟。

monitor.start(loop_action=lambda: time.sleep(1))

推荐阅读