python - 在 Python 中使用多处理高效地创建 1% 的样本
问题描述
我正在尝试使用多处理逐行处理大型数据集(300GB)。我想定义一个基于一个变量的 1% 随机样本。我的第一步是定义样本,然后我想使用多处理读取数据文件。我猜如果我定义用于随机样本的集合的部分没有为每个孩子运行,那么脚本将运行得更快。但是,当我尝试将脚本的那部分移动到if __name__ == "__main__":
子程序似乎不再识别来自父级的随机样本时。我得到错误:
NameError:名称“id_pct1”未定义
将我定义随机样本的脚本部分放在哪里最有效?
#define sample
uid = list(line.strip() for line in open('Subsets/unique_ids_final.txt'))
pct1 = round(len(uid)/100)
random.seed(1)
id_pct1 = set(random.sample(uid, k=pct1))
id_pct1.add(vname)
#read original file and write 1% sample using multiprocessing
def worker(chunkStart, chunkSize, q):
with open('myfile.txt') as f:
tlines = []
f.seek(chunkStart)
lines = f.read(chunkSize).splitlines()
for line in lines:
data = line.split('*')
if data[30] in id_pct1: tlines.append(line)
q.put(tlines)
return tlines
def chunkify(fname,size=1024*1024):
fileEnd = os.path.getsize(fname)
with open(fname, 'r') as f:
chunkEnd2 = 0
while True:
chunkStart = chunkEnd2
f.seek(chunkStart)
f.read(size)
chunkEnd1 = f.tell()
f.readline()
chunkEnd2 = f.tell()
chunkSz = 1024*1024 + chunkEnd2 - chunkEnd1 - 1
yield chunkStart, chunkSz
if chunkEnd2 >= fileEnd:
break
def listener(q):
with open('myfile1pct.txt', 'w') as out_f1:
while True:
m = q.get()
if m == 'kill': break
else:
for line in m:
out_f1.write(line+'\n')
out_f1.flush()
def main():
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool()
watcher = pool.apply_async(listener, (q,))
jobs = []
for chunkStart, chunkSize in chunkify('myfile.txt'):
jobs.append(pool.apply_async(worker,(chunkStart,chunkSize,q)))
for job in jobs:
job.get()
q.put('kill')
pool.close()
pool.join()
if __name__ == '__main__':
main()
解决方案
如果您希望这些项目在#define
整个程序中都可用,您应该使用global
它前面的关键字。但是,声明全局变量通常被认为是不好的做法。您应该考虑将您的#define
逻辑合并到您的函数中,如下所示:
#read original file and write 1% sample using multiprocessing
def worker(chunkStart, chunkSize, q):
#define sample
uid = list(line.strip() for line in open('Subsets/unique_ids_final.txt'))
pct1 = round(len(uid)/100)
random.seed(1)
id_pct1 = set(random.sample(uid, k=pct1))
id_pct1.add(vname)
with open('myfile.txt') as f:
tlines = []
f.seek(chunkStart)
lines = f.read(chunkSize).splitlines()
for line in lines:
data = line.split('*')
if data[30] in id_pct1: tlines.append(line)
q.put(tlines)
return tlines
def chunkify(fname,size=1024*1024):
fileEnd = os.path.getsize(fname)
with open(fname, 'r') as f:
chunkEnd2 = 0
while True:
chunkStart = chunkEnd2
f.seek(chunkStart)
f.read(size)
chunkEnd1 = f.tell()
f.readline()
chunkEnd2 = f.tell()
chunkSz = 1024*1024 + chunkEnd2 - chunkEnd1 - 1
yield chunkStart, chunkSz
if chunkEnd2 >= fileEnd:
break
def listener(q):
with open('myfile1pct.txt', 'w') as out_f1:
while True:
m = q.get()
if m == 'kill': break
else:
for line in m:
out_f1.write(line+'\n')
out_f1.flush()
def main():
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool()
watcher = pool.apply_async(listener, (q,))
jobs = []
for chunkStart, chunkSize in chunkify('myfile.txt'):
jobs.append(pool.apply_async(worker,(chunkStart,chunkSize,q)))
for job in jobs:
job.get()
q.put('kill')
pool.close()
pool.join()
if __name__ == '__main__':
main()
推荐阅读
- php - 如何按列从 pdo 中获取数据?
- architecture - 值对象和业务逻辑演进
- python - 如何清除打印的文本?
- django - 未找到请求 URL,已部署 Django
- android - android 本机上是否有任何可能的方式来显示某种 Dialog 或 BottomSheetDialog 与软键盘重叠
- reactjs - 如何在没有 onClick 事件的情况下从子组件调用父组件的方法。这在 React JS 中可能吗?
- android - 从谷歌地图获取经度和纬度的不断更新
- cakephp - 如何在 Flash 消息中嵌入 href?
- ffmpeg - ffmpeg CLI 设置 -r 30 如果输入 => 30 fps,否则 -r 如果 < 30 fps 则采用输入 fps
- .net-core - 如何在 Cloud Build 的一个构建步骤中运行多个 dotnet 命令?