python - 多处理的并行性几乎没有减少时间
问题描述
我使用this和this并行运行 2 个函数调用,但时间几乎没有改善。这是我的代码:
顺序:
from nltk import pos_tag
def posify(txt):
return ' '.join([pair[1] for pair in pos_tag(txt.split())])
df1['pos'] = df1['txt'].apply(posify) # ~15 seconds
df2['pos'] = df2['txt'].apply(posify) # ~15 seconds
# Total Time: 30 seconds
平行:
from nltk import pos_tag
import multiprocessing
def posify(txt):
return ' '.join([pair[1] for pair in pos_tag(txt.split())])
def posify_parallel(ser, key_name, shared_dict):
shared_dict[key_name] = ser.apply(posify)
manager = multiprocessing.Manager()
return_dict = manager.dict()
p1 = multiprocessing.Process(target=posify_parallel, args=(df1['txt'], 'df1', return_dict))
p1.start()
p2 = multiprocessing.Process(target=posify_parallel, args=(df2['txt'], 'df2', return_dict))
p2.start()
p1.join(), p2.join()
df1['pos'] = return_dict['df1']
df2['pos'] = return_dict['df2']
# Total Time: 27 seconds
我预计总时间约为 15 秒,但我得到了 27 秒。
如果有什么不同的话,我有一个 6 核(12 个逻辑)的 i7 2.6GHz CPU。
是否有可能在 15 秒左右达到目标?这与pos_tag
函数本身有关吗?
编辑:
我最终只是做了以下事情,现在是 15 秒:
with Pool(cpu_count()) as pool:
df1['pos'] = pool.map(posify, df1['txt'])
df2['pos'] = pool.map(posify, df2['txt'])
我认为这样的线路顺序运行,但它们每个都在内部并行运行。只要是15秒,我就可以。
解决方案
从进程传回数据的更常用方法是通过multiprocessing.Queue
实例。由于不知道数据帧数据的具体细节和处理结果,我无法量化通过从托管字典切换会提高多少性能,但使用队列应该会提高性能。
from nltk import pos_tag
import multiprocessing
def posify(txt):
return ' '.join([pair[1] for pair in pos_tag(txt.split())])
def posify_parallel(ser, which_df, q):
# Pass back the results along with which dataframe the results are for:
q.put((which_df, ser.apply(posify)))
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=posify_parallel, args=(df1['txt'], 1, q))
p1.start()
p2 = multiprocessing.Process(target=posify_parallel, args=(df2['txt'], 2, q))
p2.start()
# Get the results:
for _ in range(2):
# Must do the gets before joing the processes!
which_df, results = q.get()
if which_df == 1:
df1['pos'] = results
else:
# assert(which_df == 2)
df2['pos'] = results
p1.join()
p2.join()
要使用多处理池:
from nltk import pos_tag
import multiprocessing
def posify(txt):
return ' '.join([pair[1] for pair in pos_tag(txt.split())])
def posify_parallel(ser):
return ser.apply(posify)
pool = multiprocessing.Pool(2)
results1 = pool.apply_async(posify_parallel, args=(df1['txt'],))
results2 = pool.apply_async(posify_parallel, args=(df2['txt'],))
df1['pos'] = results1.get()
df2['pos'] = results2.get()
推荐阅读
- ios - 如何将视图控制器设置为 rootviewcontroller
- python - 对四列执行haversine函数到新列
- javascript - 尝试删除 ComponentDidMount 甚至 Render 中的引导类时,React 应用程序出错
- python - 如何修复文件处理输出问题,我得到两次打印相同的输出
- javascript - 如何将可关闭的文本标签添加到 Textarea Kendo | jQuery
- javascript - 在 react-native 中获取主要颜色?
- tensorflow - 当我将列表对象添加到 keras 子类模型时,`tf.model_to_estimator` 引发 AttributeError
- ms-word - 用 MS Word 中的字段替换文本
- linux-kernel - 验证从哪里调用“kworker/n:n”(在 ps -aux 中)
- python - “跳过集合点重新初始化”何时执行。在 TensorFlow 中调用collective_ops 时出现