python-3.x - 如何为在键盘中断之前运行并且不返回任何内容的类编写单元测试?
问题描述
我正在开发一个监视文件系统中目录的 Python 服务。当它看到一个文件已被创建或移动到那里时,它会将文件的路径发送到 Kafka 队列中。我的服务完全按照我的需要工作,但我的问题是我应该有至少 90% 的单元测试覆盖率。我对 Python 比较陌生,而且我以前从未在任何语言中使用过单元测试,所以我觉得我的深度不够。我只是想不通我将如何测试这些类。
这是监视文件系统的类,我正在使用看门狗库。
我将handler=FileHandler
参数添加到init是因为我认为我可以使用它来向类传递一个我可以用于测试的假处理程序,但这感觉它是不必要的复杂。
class FileSystemMonitor:
def __init__(self, target_path, kafka_queue, handler=FileHandler):
self.path = target_path
self.queue = kafka_queue
self.handler = handler(self.queue)
def start(self):
observer = Observer()
observer.schedule(self.handler, self.path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def parse_args():
path = sys.argv[1] if len(sys.argv) > 1 else '.'
queue = sys.argv[2] if len(sys.argv) > 2 else 'default'
return path, queue
if __name__ == "__main__":
path, queue = parse_args()
monitor = FileSystemMonitor(path, queue)
monitor.start()
这是我创建的类,它处理监视器抛出的事件,并将路径传递给 Kafka 队列。
class FileHandler(PatternMatchingEventHandler):
def __init__(self, queue):
super(FileHandler, self).__init__(ignore_patterns=["*/.DS_Store"], ignore_directories=True)
self.queue = queue
def on_any_event(self, event):
super(FileHandler, self).on_any_event(event)
#print(event, self.queue)
result = kafkaProducer.send_msg(self.queue, event.src_path, event.event_type)
print("Handler:", result)
return result
我已经为 kafkaProducer 类编写了一些测试,我对此并不感到困难,因为它实际上返回了一个我可以测试的值。
FileSystemMonitor 无限运行,只是等待键盘中断,当它结束时,它不会返回任何东西,那么我该如何为它编写单元测试呢?
至于 FileHandler 类,它取决于监视器类触发的事件,那么我将如何隔离 Handler 类来测试它呢?
解决方案
FileSystemMonitor.start
很难测试,因为它会阻塞直到发生外部事件,但是由于阻塞,测试不能轻易地使事件发生。我想你可以用多线程或多处理或者只是一个计时器来做一些技巧,但这会给你的测试增加一些我不喜欢的不确定性。
更明确的方法是允许调用者指定循环内发生的情况,while
以便可以在测试中引发异常,同时time.sleep
在生产代码中调用。
class FileSystemMonitor:
def __init__(self, target_path, kafka_queue, handler=FileHandler):
self.path = target_path
self.queue = kafka_queue
self.handler = handler(self.queue)
def start(self, loop_action):
observer = Observer()
observer.schedule(self.handler, self.path, recursive=True)
observer.start()
try:
while True:
loop_action()
except KeyboardInterrupt:
observer.stop()
observer.join()
这就是您的测试的样子:
def fake_loop_action():
raise KeyboardInterrupt
def test_FileSystemMonitor():
# Initialize target_path, kafka_queue and handler here.
# You might want to use test doubles.
monitor = FileSystemMonitor(target_path, kafka_queue, handler)
monitor.start(loop_action=fake_loop_action)
而在生产代码中,您将time.sleep
改为使用。您甚至可以现在指定呼叫延迟。
monitor.start(loop_action=lambda: time.sleep(1))
推荐阅读
- javascript - jQuery选择具有相同占位符值的输入字段
- ibm-mq - 尝试访问在 docker (ibmcom/mq) 中运行的 MQ 时出现 Websphere MQ Explorer (AMQ4036) 错误
- reactjs - Material-UI 将 React 中的单词大写
- android - RxJava2 - 如何知道所有 concatMapSingle 操作何时完成迭代 Observable
- python - Python docx - AttributeError:'bytes'对象没有属性'seek'
- python-3.x - 类型警告 Pycharm
- jquery - row().child() 在 DataTables 中添加未指定的 HTML
- javascript - 如何在主线程和 Web Worker 之间共享 pouchDB
- php - 值未存储在数据库中
- spring - 当我们将库(业务逻辑)加载到 Spring Boot 框架中时,应用程序是否会过载?