python - python中的多线程程序,即使我使用join(),只有一个子线程工作并且程序没有正常退出
问题描述
我创建了十个线程来处理全局列表中的项目,但我不知道为什么只有第一个工作人员做这项工作,而且主线程在子线程完成之前完成,即使我使用了 thread.join()。这是代码,我认为问题可能是因为我在 myThread.run 中使用了 while 循环。但我不知道如何在全局列表为空之前告诉这些线程继续工作。
# coding=utf-8
import threading
import numpy as np
dfs = ['units' + str(i).zfill(5) for i in range(250)]
units = dfs.copy()
k = [str(i).zfill(5) for i in range(500, 21800000)]
units.extend(k)
np.random.shuffle(units)
marker = []
def working_fun(df, unit):
global marker
if unit in df:
threadlock.acquire()
marker.append(int(unit[5:]))
class myThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
self.work_load = []
def run(self):
global dfs
print("start thread" + self.name)
while True:
threadlock.acquire()
if units != []:
unit = units.pop()
else:
unit = None
threadlock.release()
if unit is not None:
self.work_load.append(unit)
working_fun(dfs, unit)
else:
print('------', self.name, '--finish---', len(self.work_load), '--------')
break
threadlock = threading.RLock()
thds = []
for i in range(10):
thd = myThread(name='thd' + str(i))
thds.append(thd)
for thd in thds:
thd.start()
thd.join()
print('output:', marker)
解决方案
试试这种方式:
import numpy
import multiprocessing
# Same as before
dfs = ['units' + str(i).zfill(5) for i in range(250)]
units = dfs.copy()
k = [str(i).zfill(5) for i in range(500, 21800000)]
units.extend(k)
numpy.random.shuffle(units)
# Almost the same as before
def working_fun(inp):
df, unit = inp
if unit in df:
return int(unit[5:])
# This is needed for multiprocessing/threading
if __name__ == "__main__":
# Create a pool of workers (10 in this case)
with multiprocessing.Pool(10) as pool:
# Map some (global) iterable on the pool of workers
result = pool.map(working_fun, [(dfs, unit) for unit in units])
# Show the results (note that the function returns None if the unit is not in df)
print([r for r in result if r is not None])
输出:
$ python test.py
[1, 75, 139, 24, 101, 72, 156, 55, 58, 235, 14, 123, 177, 112, 168, 178, 173, 162, 104, 226, 230, 205, 69, 100, 246, 18, 117, 149, 37, 214, 206, 26, 136, 87, 144, 79, 50, 222, 7, 133, 36, 41, 30, 163, 103, 187, 6, 225, 15, 223, 234, 138, 126, 19, 64, 224, 39, 145, 130, 42, 11, 221, 128, 213, 204, 2, 45, 220, 242, 109, 59, 238, 232, 68, 152, 107, 148, 83, 197, 241, 118, 32, 90, 99, 22, 119, 0, 67, 48, 181, 71, 193, 95, 29, 113, 40, 134, 218, 141, 27, 121, 8, 207, 110, 60, 237, 47, 94, 73, 157, 184, 78, 159, 49, 202, 239, 124, 215, 127, 209, 62, 4, 52, 82, 74, 9, 199, 158, 188, 3, 61, 180, 57, 219, 245, 38, 16, 190, 12, 17, 175, 46, 196, 125, 194, 76, 129, 161, 81, 93, 137, 155, 174, 54, 35, 25, 115, 140, 216, 23, 21, 233, 77, 33, 92, 208, 120, 86, 165, 70, 135, 28, 91, 66, 85, 169, 203, 211, 114, 154, 122, 217, 247, 31, 147, 96, 142, 191, 10, 183, 80, 179, 189, 56, 105, 160, 228, 185, 132, 5, 53, 106, 13, 210, 182, 89, 192, 153, 170, 111, 65, 212, 186, 151, 200, 248, 229, 102, 240, 198, 176, 43, 131, 166, 236, 231, 116, 172, 146, 88, 44, 98, 227, 20, 34, 164, 108, 171, 244, 243, 195, 150, 249, 97, 167, 51, 201, 84, 63, 143]
推荐阅读
- python - 如何清理/重新初始化 Keras 资源/网络?如何从头开始训练?
- javascript - 要将时区与时刻一起使用,您是否将所有时刻引用替换为时刻时区?
- docker - Docker - 如何运行在构建过程中安装的包?
- dart - Dart Async 不等待完成
- c++ - QT:QActionGroup 添加到 QMenu 后,谁是 QActionGroup 成员的父级?
- java - 如何在java中使用Jsoup处理异常以保持程序运行
- keras - 使用 GRU(Keras) 迭代预测序列
- jmeter - 保存许多变量的 Jmeter Beanshell 采样器的内存限制
- python - 根据字典值集对字符串进行评分
- ruby-on-rails - 如何以安全的方式向 Rails 应用程序发送 POST 请求?