首页 > 解决方案 > 为什么在管道组件上调用通信()而不是最后一个会产生损坏的输出?

问题描述

我想计算一个大型 .fastq 文件(5900 万行)中大约 500 个模式的出现次数。这些模式的长度都正好是 20 个字符。

在 unix 中,这很简单:

grep -F -o -f patterns.txt big_file.fastq | sort | uniq -c

但是,我希望避免编写临时模式文件,所以我使用 python 的子进程库创建了一个管道:

from subprocess import Popen, PIPE, STDOUT

p1 = Popen(["grep", "-F", "-o", "-f", "-", "big_file.fastq"], shell = False, stdin = PIPE, stdout = PIPE, stderr= STDOUT)
p2 = Popen(["sort"], shell = False, stdin = p1.stdout, stdout = PIPE, stderr = STDOUT)
p3 = Popen(["uniq", "-c"], shell = False, stdin = p2.stdout, stdout = PIPE, stderr = STDOUT)

然后我对此调用communicate(),提供一个编码的io.StringIO类文件对象作为输入(我使用'-'将它传递给grep命令):

import io

patterns_file = io.StringIO("\n".join(patterns_list))
p3.communicate(input = patterns_file.read().encode('utf-8'))[0]

当我像这样在 uniq 上调用communicate() 时,效果很好。

但是,在测试时,我错误地在管道的第一部分调用了它:

p1.communicate(input = patterns_file.read().encode('utf-8'))[0]

这给了我完全错误的输出,包括比预期的 20 个字符短或长的匹配。

我不明白为什么会这样。在 p1 上调用communicate() 不会只涉及管道的那一部分而忽略其余部分吗?删除 p2 和 p3 导致 p1 正确 grep。我觉得我错过了关于 Popen 工作原理的一些东西。

任何帮助表示赞赏。

标签: pythongrepsubprocesspipepopen

解决方案


当您实例化Popen对象时,它们引用的子流程会立即启动。因此,即使你只调用communicate()p1p2并且p3也在运行。

为什么这很重要?因为p2它的标准输入仍然连接到p1正在写入其输出的 FIFO!

如果在您要求 Python 程序直接读取相同内容的同时, sorton 的操作p2仍在读取内容,那么您最终p1会在它们之间分配 ' 的输出。预计会引起欢闹:在两个程序之间拆分读取不会导致明显损坏的数据的唯一方法是,如果两者都p2communicate()读取 20 字节的倍数的块(同时仍然足够小,操作系统不会拆分它们)进入多个系统调用);但是,用于无缓冲读取的典型块大小是 4096 的倍数,每次读取块时都会从记录边界创建 4 个字节的偏移量。


顺便说一句——对于许多程序来说,这不会有那么严重的影响,因为 FIFO 的缓冲区相对较小;为读取的每一行输入写入一行输出的程序最终会很快在输出上阻塞,因此将停止读取进一步的输入,直到其输出至少部分刷新。sort是一个例外,因为它需要先读取所有输入,然后才能知道第一行输出是什么!


推荐阅读