首页 > 解决方案 > 线程化参数列表并在 Python 中加入结果

问题描述

我有一个参数列表和一个函数,该函数接受一个参数并根据该参数返回一些值。我想并行化这个过程。我也不关心返回结果的顺序,只关心所有的值。此外,参数列表会很长(数千个),但函数很复杂,需要几秒钟来计算。

一个最小的非并行版本是:

def f(x): 
    return [x,x*x]

argument_list = list(range(1,8)) # random arguments
result_list = []
for a in argument_list:
    result_list += f(a)
print(result_list)

标签: python-3.xmultithreadingpython-multithreading

解决方案


我试图与您的系列示例保持密切联系;有关详细信息,请参阅源代码中的注释。
请注意,在 python 中存在 Global-Interpreter-Lock 并且倾向于使线程计算的效率低于预期(因为所有线程都在为这个全局锁而战)。
https://wiki.python.org/moin/GlobalInterpreterLock

#!/usr/bin/env python

def parallelise(arg_list, arg_fnct, thread_count):
  import threading
  #
  # general purpose data structure to store anything
  class Record: pass
  #
  # the actual work performed by each thread
  def work(r):
    # divide equally the global data amongst threads
    amount=len(r.arg_list)
    begin=int(amount*r.thread_index/r.thread_count)
    end=min(amount, int(amount*(r.thread_index+1)/r.thread_count))
    # this thread will only work on its part of the global problem
    r.result=[]
    for i in range(begin, end):
      r.result+=arg_fnct(r.arg_list[i]) # call the original computation
  #
  # the set of data structures used by the threads (one each)
  thread_data=[None]*thread_count
  #
  # split the global work amongst many threads
  for idx in range(thread_count):
    r=Record()
    r.arg_list=arg_list           # provide global data of the problem
    r.thread_index=idx            # provide thread layout
    r.thread_count=thread_count   # provide thread layout
    r.thread=threading.Thread(target=work, args=(r,)) # prepare a thread
    r.thread.start()              # launch the thread
    thread_data[idx]=r            # remember this data structure
  #
  # collect the results
  result_list=[]
  for r in thread_data:
    r.thread.join()       # wait for the thread to finish its job
    result_list+=r.result # safely access its result
  return result_list

#~~~~ the original problem ~~~~

import sys

# the original computing function
def f(x):
  return [x, x*x]

# the global set of data to be processed
argument_list=list(range(1, 1000)) # random arguments

import multiprocessing
total_cpu_count=multiprocessing.cpu_count() # detect system properties
result_list=parallelise(argument_list, f, total_cpu_count)

sys.stdout.write('%s\n'%result_list) # everything is done

推荐阅读