首页 > 解决方案 > python中的多线程进程使用队列写入文件检查工作是否已完成

问题描述

from multiprocessing.dummy import Pool as ThreadPool
import multiprocessing as mp



def func(a):

    pthData = "C:/temp/temp.txt"
    with open(pthData, 'r') as file:
        done = file.read().splitlines()

    if a in done:
        return 'done'

    q.put(a)
    return a

def listener(q):

    pthData = "C:/temp/temp.txt"
    m = q.get()
    with open(pthData, 'a') as the_file:
        the_file.write( m + '\n')
        #he_file.write(str(m) + '\n')


a =  ['a', 'b', 'c', 'd', 'a', 'b']


# Make the Pool of workers
pool = ThreadPool(4)

#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()    

#put listener to work first
watcher = pool.apply_async(listener, (q,))

pool.starmap(func, a, q)
## TypeError: '<=' not supported between instances of 'AutoProxy[Queue]' and 'int'

pool.starmap(func, a)
## Runs but only writes 'a' to temp file

pool.starmap(func, (a, q))
## func() takes 1 positional argument but 6 were given

pool.apply_async(func, (a, q))
## freezes on pool.join

# Close the pool and wait for the work to finish
pool.close()
pool.join()

为什么 pool.join() 上的 apply_async 冻结?我尝试将其放入 if name == ' main ' 但结果相同。

如何正确调用func传递 1 个参数 (a) 和队列 (q)?

标签: pythonmultithreadingmultiprocessing

解决方案


如何正确调用 func 传递 1 个参数 (a) 和队列 (q)?

这至少不会冻结

  • 执行前确保temp.txt存在。
  • 添加q参数到func.
      def func(a,q):
          print(f'func({a})')
          ...
  • apply_async在列表理解中使用。
    if __name__ == '__main__':

        # Make the Pool of workers
        with ThreadPool(4) as pool:
            q = queue.Queue()
            #put listener to work first
            watcher = pool.apply_async(listener, (q,))
            results = [pool.apply_async(func, (item, q)) for item in a]
            # just check stuff
            for result in results:
                result.wait()
                print(result, result.successful(),result.get())
            pool.close()
            pool.join()

  • 您将需要解决一些其他问题,例如listener运行一次然后停止。
  • 我使用了许多其他方法来做到这一点,apply_async因为它是您问题中的选项之一。
  • 我喜欢自己使用 concurrent.futures。
  • 您可能会受益于使用变体阅读搜索结果python threading producer consumer site:stackoverflow.com

推荐阅读