首页 > 解决方案 > pytest函数测试多处理任务队列服务

问题描述

我有一个任务队列处理服务,我正在尝试对其运行 pytest 功能测试。在“生产”中运行它时,我从命令行启动它,例如python main.py.

我不知道如何从 pytest 启动此任务服务以对其进行功能测试。如何在 pytest 中启动服务,以便我可以向其中添加作业并查看作业是否在完成后得到处理并添加到数据库中?

def main():
    store = "jobs"
    worker_id = 1
    # Process tasks
    task_processing[store] = multiprocessing.Process(
            target=process_tasks, args=(store, worker_id)
        )
    nanopub_processing[store].start()


if __name__ == "__main__":
    main()

标签: python-3.xpytestpython-multiprocessing

解决方案


只要确保您main正确访问该功能:

from main import main

def test_main():
    main()
    ...

推荐阅读