python - 通过 Python Subprocess' Popen over external 命令流式传输内存数据
问题描述
我想要达到的目标
- 我想通过 Python 的外部程序从类似生成器的对象中逐行流式传输元素。
- 崩溃了,我想要一些东西,比如
Generator -> Popen(...) -> Generator
不在内存中保存太多数据。
这是一个有效的最小示例,它演示了我想要实现的目标:
from io import StringIO
from subprocess import Popen, PIPE
import time
proc_input = StringIO("aa\nbb\ncc\ndd")
proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
for line in proc_input:
proc.stdin.write(line.encode())
yield proc.stdout.readline()
time.sleep(1)
问题:proc.stdout.readline()
只是阻塞并且不显示任何内容。
我已经学到了什么:
- 如果输入来自类似文件的对象(即已经
fileno()
实现的对象),我可以将其直接传递给标准输入并避免写入 PIPE。但为此,我需要首先将生成器流式传输到文件中,我希望避免这样做,因为这似乎是不必要的绕道。例如以下作品。
import tempfile
from subprocess import Popen, PIPE
tp = tempfile.TemporaryFile()
tp.write("aa\nbb\ncc\ndd".encode())
tp.seek(0)
proc = Popen(["cat"], stdin=tp, stdout=PIPE)
for line in proc.stdout:
print(line)
- 如果我坚持写入 PIPE 对象,我可以通过关闭输入流然后从输出流中读取来解决问题。但在这里我不知道数据在哪里。因为我的生成器产生了 GB 的数据,我不想遇到内存错误。
proc_input = StringIO("aa\nbb\ncc\ndd")
proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
for line in proc_input:
proc.stdin.write(line.encode())
proc.stdin.close()
for line in proc.stdout:
print(line)
我也尝试过:
- 我玩弄了 buffersize 参数
Popen(..., bufsize=)
,但它似乎没有任何效果。 - 我尝试将输入数据写入,
io.BufferedWriter
希望 Popen 可以将其消化为标准输入的输入。也没有成功。
附加信息:我正在使用 Linux。
评论评论
建议将输入生成器分成块。这可以通过
def PopenStreaming(process, popen_kwargs, nlines, input):
while input:
proc = Popen(process, stdin=PIPE, stdout=PIPE, **popen_kwargs)
for n, row in enumerate(input):
proc.stdin.write(row)
if n == nlines:
proc.stdin.close()
break
for row in proc.stdout:
yield row
解决方案
我不确定是否总是可以做你想做的事情。https://docs.python.org/3/library/subprocess.html上的文档说
警告:使用
communicate()
而不是.stdin.write
,.stdout.read
或.stderr.read
避免由于任何其他操作系统管道缓冲区填满并阻塞子进程而导致的死锁。
所以你应该使用communicate
,但这意味着等待进程终止:
Popen.communicate(input=None, timeout=None)
与进程交互:将数据发送到标准输入。从 stdout 和 stderr 读取数据,直到到达文件结尾。等待进程终止。
这意味着您只能使用communicate
一次,这不是您想要的。
但是,我认为使用行缓冲文本模式应该是安全的,以避免死锁:
from subprocess import Popen, PIPE
kwargs = {
"stdin": PIPE,
"stdout": PIPE,
"universal_newlines": True, # text mode
"bufsize": 1, # line buffered
}
with Popen(["cat"], **kwargs) as process:
for data in ["A\n", "B\n", "C\n"]:
process.stdin.write(data)
print("data sent:", data)
output = process.stdout.readline()
print("output received:", output)
如果这不适用于您的情况,也许您可以将呼叫拆分为多个较小的呼叫?使用check_output
它的input
关键字参数也可以简化你的代码:
from subprocess import check_output
output = check_output(["cat"], input=b"something\n")
print(output)
推荐阅读
- c# - UNITY - 玩家控制器不向鼠标光标移动
- python - 使用 Python 根据数据更改 SQL UPDATE 语句
- ios - AppDelegate 未提供 UIViewController
- python-3.x - 使用 Python 优化排序任务
- c# - 如何在不调用基本默认构造函数的情况下实例化继承的类
- node.js - 如何使用 node/express 控制台记录 api 调用的 url
- html - 修复 IE 中 mat-expansion-indicator 的位置
- php - 使用准备好的语句验证来自 MySQL 数据库的散列密码
- javascript - 在 ArcGIS 中添加要素图层
- python - 如何将具有输入名称的元组添加到列表中