python - 使用 islice 和 multiprocessing 批量读取和处理大型文本文件
问题描述
代码不会返回任何东西,它会一直运行下去。请帮助提供代码片段。仅供参考:我是multiprocessing
第一次使用。
我的本地内存不足,因此从 zip 文件中提取数据。我的想法是一次读取 n 行islice
并使用process_logBatch()
.
在 Windows 机器上运行此代码 - Jupyter Notebook。
import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd # Unused.
def process_logBatch(next_n_lines):
l = [random.randint(0,100) for i in range(5)]
print(l)
return l
def collect_results(result):
results.extend(result)
pool = mp.Pool(processes=(mp.cpu_count()-1))
results = []
with zipfile.ZipFile('log.zip', 'r') as z:
with z.open('log.txt') as f:
while True:
print(f.closed)
next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
if not next_n_lines:
break
try:
pool.apply_async(process_logBatch, args=(next_n_lines, ), callback=collect_results)
except Exception as e:
print(e)
if counter == 2:
break
pool.close()
pool.join()
print(results)
解决方案
有几个问题。一种是在 Windows 上,您需要一个声明来保护主模块,如 multiprocssing 模块文档if __name__ == '__main__':
中标题为“安全导入主模块”的部分所示和讨论。
然而,第二件事并不是那么容易解决的。每个进程都在自己的内存空间中运行,因此它们并不都有相同的results
列表。为了避免这种情况,我在所有子流程结束后转而使用Pool.map_async()
并收集结果。
这是我认为可行的一种方式(基于您的示例代码):
import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd # Unused.
import random # Added.
def process_logBatch(next_n_lines):
l = [random.randint(0,100) for i in range(5)]
print(l)
return l
if __name__ == '__main__':
# Not longer needed.
# def collect_results(result):
# results.extend(result)
pool = mp.Pool(processes=(mp.cpu_count()-1))
with zipfile.ZipFile('log.zip', 'r') as z:
with z.open('log.txt') as f:
counter = 0 # Added to avoid NameError because undefined.
while True:
next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
if not next_n_lines:
break
try:
results = pool.map_async(process_logBatch, next_n_lines)
except Exception as e:
print(e)
if counter == 2:
break
pool.close()
pool.join()
print(results.get())
推荐阅读
- reactjs - 如何在一个类中定义多个方法以从组件中调用它们
- jquery - 如何在不触发 CORS 的情况下使用 Ajax 将表单从子域发布到主域?
- javascript - 为什么我输入的 ARN 未开始执行我的 aws step 函数?
- javascript - 点亮元素可以跟踪其范围之外的变量吗
- android - 如何在 android 中获取“购买状态”,特别是从 google play 商店购买应用程序产品?
- parsing - 需要本地 SDK 工具来解析带有大表的原生 pdf 文件
- linkedin-api - LinkedIn Share API - 创建附有 PDF 的帖子
- java - Eclipse 无法导入 import com.google.maps.*(适用于 java Fx 程序)
- javascript - 打破承诺链
- javascript - Feedly API 正在返回会话过期,而不是让我从本地节点环境访问 API