python - 挂在多处理队列中
问题描述
我正在尝试使用 multiprocessing.Queue 将文件拆分为几个较小的文件。虽然下面的代码经常工作,但有时它会挂起:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/multiprocessing/util.py", line 265, in _run_finalizers
finalizer()
File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/multiprocessing/util.py", line 189, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/multiprocessing/queues.py", line 192, in _finalize_join
thread.join()
File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
我不确定为什么?传递给进程的数据可能非常大,这可能是个问题吗?这是我正在使用的伪代码:
def generate_split_processes_to_run(self,protein_seqs,seq_chunks):
c=0
for chunk in seq_chunks:
self.queue.put([protein_seqs,chunk,c])
c+=1
def sample_split_handler(self,protein_seqs,protein_seqs_groups,worker_count):
#loading the queue
self.generate_split_processes_to_run(protein_seqs,protein_seqs_groups)
#spawning the processes
processes = [Process(target=self.sample_split_worker, args=(self.queue,)) for _ in range(worker_count)]
#starting the processes
for process in processes:
process.start()
#joining processes
for process in processes:
process.join()
print(processes)
def sample_split_worker(self, queue):
while not queue.empty():
seqs, chunk, chunk_number= queue.get()
self.save_chunks(seqs, chunk, chunk_number)
def split_sample(self):
seqs=self.read_file(self.target_path)
seqs_keys=list(seqs.keys())
worker_count= 7
seq_chunks= chunk_generator(seqs_keys, 1000)
self.sample_split_handler(seqs,seq_chunks,worker_count)
def save_chunks(self,seqs,
chunk,
chunk_number):
with open(chunk_path, 'w+') as file:
while chunk:
seq_id = chunk.pop(0)
chunk_str = 'something'
file.write('>' + seq_id + '\n' + chunk_str + '\n')
当我打印进程列表时,它们似乎都已完成:
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>, <Process(Process-6, stopped)>, <Process(Process-7, stopped)>]
我之前使用了 Pool,它工作得很好,但我现在想使用 Queue。欢迎任何帮助!
解决方案
所以,显然这是一个有点“常见”的问题。.empty()
用or来检查队列大小.qsize()
并不能保证队列真的是空的。在快速进程中,您通常会queue.empty()==True
认为队列不为空。
对此有一些建议的解决方案:
1-sleep()
在进程之间包含一个计时器,以便队列有时间获取新项目。
2-将哨兵添加到队列中。
与计时器选项不同,哨兵将保证您始终完成队列。将项目插入队列后,添加一个哨兵,就像这样queue.put(None)
(None
为了降低内存消耗)。为您正在运行的每个进程插入一个None
。这将导致如下结果:
def sample_split_worker(self, queue):
while True:
#when the queue is finished, each process will receive a None, thus breaking the cycle.
record = queue.get()
If not record: break
seqs, chunk, chunk_number = record
self.save_chunks(seqs, chunk, chunk_number)
希望这可以帮助某人。
推荐阅读
- linux - 创建 Jenkins 作业以在服务器上部署文件
- jquery - 使用 jQuery 使用 ID 填充模式
- sql - 如何将不同的列值转置为列名并将其他列值映射到每个列名
- java - ArrayDeque 在删除/添加时是否有移动元素的开销?
- html - 将类添加到菜单问题
- android - 如何在android中正常记录和验证截图测试
- java - 从没有输入字段的 JSP 创建 POST 请求以执行几个不同的操作
- javascript - 无法修复十进制数的正则表达式
- python - 如果 stdout.channel.recv() 上的条件不起作用,则 SSH paramiko
- jquery - 如何使用 Laravel 检索 JSON 数据并将其存储到数据库中