python - python中的多线程进程使用队列写入文件检查工作是否已完成
问题描述
from multiprocessing.dummy import Pool as ThreadPool
import multiprocessing as mp
def func(a):
pthData = "C:/temp/temp.txt"
with open(pthData, 'r') as file:
done = file.read().splitlines()
if a in done:
return 'done'
q.put(a)
return a
def listener(q):
pthData = "C:/temp/temp.txt"
m = q.get()
with open(pthData, 'a') as the_file:
the_file.write( m + '\n')
#he_file.write(str(m) + '\n')
a = ['a', 'b', 'c', 'd', 'a', 'b']
# Make the Pool of workers
pool = ThreadPool(4)
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
#put listener to work first
watcher = pool.apply_async(listener, (q,))
pool.starmap(func, a, q)
## TypeError: '<=' not supported between instances of 'AutoProxy[Queue]' and 'int'
pool.starmap(func, a)
## Runs but only writes 'a' to temp file
pool.starmap(func, (a, q))
## func() takes 1 positional argument but 6 were given
pool.apply_async(func, (a, q))
## freezes on pool.join
# Close the pool and wait for the work to finish
pool.close()
pool.join()
为什么 pool.join() 上的 apply_async 冻结?我尝试将其放入 if name == ' main ' 但结果相同。
如何正确调用func
传递 1 个参数 (a) 和队列 (q)?
解决方案
如何正确调用 func 传递 1 个参数 (a) 和队列 (q)?
这至少不会冻结:
- 执行前确保
temp.txt
存在。 - 添加
q
参数到func
.
def func(a,q):
print(f'func({a})')
...
apply_async
在列表理解中使用。
if __name__ == '__main__':
# Make the Pool of workers
with ThreadPool(4) as pool:
q = queue.Queue()
#put listener to work first
watcher = pool.apply_async(listener, (q,))
results = [pool.apply_async(func, (item, q)) for item in a]
# just check stuff
for result in results:
result.wait()
print(result, result.successful(),result.get())
pool.close()
pool.join()
- 您将需要解决一些其他问题,例如
listener
运行一次然后停止。 - 我使用了许多其他方法来做到这一点,
apply_async
因为它是您问题中的选项之一。 - 我喜欢自己使用 concurrent.futures。
- 您可能会受益于使用变体阅读搜索结果
python threading producer consumer site:stackoverflow.com
推荐阅读
- mysql - MySQL - 获取每个组的最新结果
- excel - Excel:当名称是一列中较长字符串的一部分时,在列之间查找匹配的名称
- python - 在这段代码中,当我选择选项 A 时,我得到总计 = 0 作为输出而不是输出总计 = 10,如何解决?
- scala - 使用 Array 列和 SaveMode.Overwrite 将 spark 数据集保存到 Apache Ignite
- python - Python 等价于 rpm -Va
- ios - 如何按日期对消息进行分组?
- azure-notebooks - 真的没有办法在 Azure Notebooks 中私下共享库吗?
- bash - 在多个 FASTA 文件中查找和替换多个序列头
- java - Android Studio 3.1.3 - 未解决的参考:R - Kotlin
- python - Bag of Words (BOW) vs N-gram (sklearn CountVectorizer) - 文本文档分类