首页 > 解决方案 > ProcessPoolExecutor 只读变速差异

问题描述

我有两个使用concurrent.futures.ProcessPoolExecutor(Python 3.6,Linux)的多处理程序版本,尽管看起来变化很小(一个比另一个慢约 3 倍),但速度差异令人惊讶。

每个子进程执行一个简单的函数,该函数从一个大字典中读取(它不会改变它)并返回一个结果。

  1. 该函数的第一个版本将 dictexecutor.submit()作为参数传入。
  2. 该函数的第二个版本直接从全局字典中读取。

代码示例

传入的变量

#!/usr/bin/env python3
import concurrent.futures, pstats, sys, cProfile

BIG_DICT = {i: 2*i for i in range(10000)}
def foo(d):
    return d[0]

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    tasks = [executor.submit(foo, BIG_DICT) for _ in range(100000)]

    for task in concurrent.futures.as_completed(tasks):
        task.result()

全局变量读取自

#!/usr/bin/env python3
import concurrent.futures, pstats, sys, cProfile

BIG_DICT = {i: 2*i for i in range(10000)}
def foo():
    return BIG_DICT[0]

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    tasks = [executor.submit(foo) for _ in range(100000)]

    for task in concurrent.futures.as_completed(tasks):
        task.result()

想法

我已经使用 cProfile 分析了程序的两个版本,并且大部分执行时间似乎都花在等待锁定上。全球版只等了10秒左右,而通关版等了将近80秒!

据我了解,当一个进程被分叉时,它应该复制其父内存的副本。由于程序是多进程的,并且BIG_DICT在创建后从未实际修改过,因此不需要锁定来保持提交每个进程之间的状态一致性。

既然BIG_DICT两个版本都需要拷贝到每个子进程的内存空间中,为什么执行时间会有这么大的差异呢?

我有几个想法:

分析结果

传入的变量

         7672287 function calls in 92.434 seconds

   Ordered by: internal time, cumulative time
   List reduced from 247 to 12 due to restriction <0.05>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   460133   75.428    0.000   75.428    0.000 {method 'acquire' of '_thread.lock' objects}
   100001    7.034    0.000    7.034    0.000 {built-in method posix.write}
   100001    2.490    0.000    2.490    0.000 {method '__enter__' of '_multiprocessing.SemLock' objects}
   100001    0.686    0.000   78.344    0.001 _base.py:196(as_completed)
    90033    0.553    0.000   75.879    0.001 threading.py:263(wait)
   100000    0.548    0.000   13.639    0.000 process.py:449(submit)
   190033    0.366    0.000    0.713    0.000 _base.py:174(_yield_finished_futures)
   100000    0.351    0.000    0.598    0.000 _base.py:312(__init__)
    90033    0.327    0.000   76.335    0.001 threading.py:533(wait)
   100001    0.261    0.000    7.617    0.000 connection.py:181(send_bytes)
   480065    0.260    0.000    0.382    0.000 threading.py:239(__enter__)
   100001    0.258    0.000   11.329    0.000 queues.py:339(put)


   Ordered by: internal time, cumulative time
   List reduced from 247 to 12 due to restriction <0.05>

Function                                                    was called by...
                                                                ncalls  tottime  cumtime
{method 'acquire' of '_thread.lock' objects}                <-   90033    0.078    0.078  threading.py:251(_acquire_restore)
                                                                190033    0.391    0.391  threading.py:254(_is_owned)
                                                                180066   74.956   74.956  threading.py:263(wait)
                                                                     1    0.003    0.003  threading.py:1062(_wait_for_tstate_lock)
{built-in method posix.write}                               <-  100001    7.034    7.034  connection.py:365(_send)
{method '__enter__' of '_multiprocessing.SemLock' objects}  <-  100001    2.490    2.490  synchronize.py:95(__enter__)
_base.py:196(as_completed)                                  <-
threading.py:263(wait)                                      <-   90033    0.553   75.879  threading.py:533(wait)
process.py:449(submit)                                      <-  100000    0.548   13.639  local.py:13(<listcomp>)
_base.py:174(_yield_finished_futures)                       <-  190033    0.366    0.713  _base.py:196(as_completed)
_base.py:312(__init__)                                      <-  100000    0.351    0.598  process.py:449(submit)
threading.py:533(wait)                                      <-   90032    0.327   76.334  _base.py:196(as_completed)
                                                                     1    0.000    0.001  threading.py:828(start)
connection.py:181(send_bytes)                               <-  100001    0.261    7.617  queues.py:339(put)
threading.py:239(__enter__)                                 <-  100000    0.070    0.116  _base.py:174(_yield_finished_futures)
                                                                100000    0.033    0.051  _base.py:405(result)
                                                                100000    0.083    0.108  queue.py:115(put)
                                                                 90032    0.040    0.058  threading.py:523(clear)
                                                                 90033    0.034    0.050  threading.py:533(wait)
queues.py:339(put)                                          <-  100000    0.258   11.329  process.py:449(submit)
                                                                     1    0.000    0.000  process.py:499(shutdown)

全局变量读取自

         5949819 function calls in 27.158 seconds

   Ordered by: internal time, cumulative time
   List reduced from 247 to 12 due to restriction <0.05>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   160569   10.072    0.000   10.072    0.000 {method 'acquire' of '_thread.lock' objects}
   100001    5.453    0.000    5.453    0.000 {method '__enter__' of '_multiprocessing.SemLock' objects}
   100001    5.338    0.000    5.338    0.000 {built-in method posix.write}
   100000    0.883    0.000    1.163    0.000 _base.py:312(__init__)
   100000    0.477    0.000   15.671    0.000 process.py:449(submit)
   100001    0.438    0.000    6.133    0.000 connection.py:181(send_bytes)
   100001    0.304    0.000   12.921    0.000 queues.py:339(put)
   100000    0.304    0.000    0.304    0.000 process.py:116(__init__)
   100001    0.277    0.000    0.432    0.000 reduction.py:38(__init__)
   100000    0.267    0.000    0.333    0.000 threading.py:334(notify)
   100000    0.240    0.000    0.747    0.000 queue.py:115(put)
   100006    0.238    0.000    0.280    0.000 threading.py:215(__init__)


   Ordered by: internal time, cumulative time
   List reduced from 247 to 12 due to restriction <0.05>

Function                                                    was called by...
                                                                ncalls  tottime  cumtime
{method 'acquire' of '_thread.lock' objects}                <-   15142    0.007    0.007  threading.py:251(_acquire_restore)
                                                                115142    0.038    0.038  threading.py:254(_is_owned)
                                                                 30284   10.022   10.022  threading.py:263(wait)
                                                                     1    0.004    0.004  threading.py:1062(_wait_for_tstate_lock)
{method '__enter__' of '_multiprocessing.SemLock' objects}  <-  100001    5.453    5.453  synchronize.py:95(__enter__)
{built-in method posix.write}                               <-  100001    5.338    5.338  connection.py:365(_send)
_base.py:312(__init__)                                      <-  100000    0.883    1.163  process.py:449(submit)
process.py:449(submit)                                      <-  100000    0.477   15.671  global.py:13(<listcomp>)
connection.py:181(send_bytes)                               <-  100001    0.438    6.133  queues.py:339(put)
queues.py:339(put)                                          <-  100000    0.304   12.921  process.py:449(submit)
                                                                     1    0.000    0.000  process.py:499(shutdown)
process.py:116(__init__)                                    <-  100000    0.304    0.304  process.py:449(submit)
reduction.py:38(__init__)                                   <-  100001    0.277    0.432  reduction.py:48(dumps)
threading.py:334(notify)                                    <-  100000    0.267    0.333  queue.py:115(put)
queue.py:115(put)                                           <-  100000    0.240    0.747  process.py:449(submit)
threading.py:215(__init__)                                  <-  100000    0.238    0.280  _base.py:312(__init__)
                                                                     3    0.000    0.000  queue.py:27(__init__)
                                                                     1    0.000    0.000  queues.py:67(_after_fork)
                                                                     2    0.000    0.000  threading.py:498(__init__)

标签: pythonpython-3.xconcurrencymultiprocessingpython-internals

解决方案


推荐阅读