首页 > 解决方案 > 使用 joblib 时 time.time() 库返回意外结果

问题描述

我有一个程序,它创建一个类的多个实例,Test然后对类的每个实例做一些工作,跟踪工作花费了多少时间。我最近决定使用joblib库并行化这段代码,但遇到了一个错误:最后的total_time变量是 now 0.0

我机器上的python环境是

$ python3
Python 3.7.0 (default, Sep 18 2018, 18:47:08) 
[Clang 10.0.0 (clang-1000.10.43.1)] on darwin

以下是针对此问题的 MCVE:

import time
import random
import multiprocessing
import joblib

class Test:
    def __init__(self):
        self.name = ""
        self.duration = 0.0

def add_test(a):
    temp = Test()
    temp.name = str(a)
    return temp


def run_test(test):
    test_start = time.time()
    rand = random.randint(1,3)
    time.sleep(rand)
    test_end = time.time()
    test.duration = round(test_end - test_start, 3)
    print(f"Test {test.name} ran in {test.duration}")

def main():
    tests = []
    for a in range(1,10):
        tests.append(add_test(a))

    num_cores = multiprocessing.cpu_count()
    joblib.Parallel(n_jobs=num_cores)(joblib.delayed(run_test)(test) for test in tests)

    total_time = round(sum(test.duration for test in tests), 3)

    print(f"This run took {total_time} seconds.")

if __name__ == '__main__':
    main()

如果我添加一个print(list(test.duration for test in tests))in main(),我看到那test.duration0.0afterrun_test()被调用。从运行上面的输入可以看出,在test.duration里面设置了一个非零值(如果合适的话)run_test()

我对 python 类或joblib库不太熟悉,所以我不确定我遇到的问题是否与滥用类或其他一些我无法解决的问题有关。

谢谢!

标签: python-3.xparallel-processingtiming

解决方案


感谢Reddit 上的 num8lock,这是解决此问题的正确方法:

import time
import random
import multiprocessing
import joblib

class Test:
    def __init__(self, name):
        self.name = name
        self.duration = 0.0
        self.start = time.perf_counter()

    def run(self):
        rand = random.randint(1,3)
        time.sleep(rand)
        _end = time.perf_counter()
        self.duration = _end - self.start
        print(f"Test {self.name} ran in {self.duration}")
        return self.duration

def add(a):
    return Test(str(a))

def make_test(test):
    return test.run()

def main():
    num_cores = multiprocessing.cpu_count()
    tests = []
    for a in range(1,10):
        tests.append(add(a))

    jobs = joblib.Parallel(n_jobs=num_cores)(joblib.delayed(make_test)(t) for t in tests)
    total_time = sum(job for job in jobs)
    print(f"This run took {total_time} seconds.")

if __name__ == '__main__':
    main()

推荐阅读