首页 > 解决方案 > Dask 计算在使用客户端时失败,在没有客户端设置时工作

问题描述

我正在尝试使用 dask 客户端来并行化我的计算。当我运行 df.compute() 时,我得到了正确的输出(虽然它很慢),但是当我在设置客户端后运行相同的东西时,我得到以下错误:

distributed.protocol.pickle - INFO - Failed to serialize <function part at 0x7fd5186ed730>. Exception: can't pickle _thread.RLock objects

这是我的代码,在第一个 df.compute() 中,我得到了预期的结果,在第二个中我没有。

@dask.delayed
def part(x):
    lower, upper = x
    q = "SELECT id,tfidf_vec,emb_vec FROM document_table"
    lines=man.session.execute(q)
    counter = lower
    df = []
    for line in lines:
        df.append(line)
        counter += 1
        if counter == upper:
            break
    return pd.DataFrame(df)

parts = [part(x) for x in [[0,100000],[100000,200000]]]
df = dd.from_delayed(parts)
df.compute()

from dask.distributed import Client
client = Client('127.0.0.1:8786')
df.compute()

标签: dask

解决方案


您的函数包含对 的引用man.session,它是函数闭包的一部分。当您使用默认调度程序线程时,可以在执行代码的线程之间共享对象。当您使用分布式调度程序时,该函数必须被序列化并发送给不同进程中的工作人员。

您应该创建一个在每次调用时创建会话对象的函数,正如您对非常相似的问题的回答所建议的那样。


推荐阅读