python - 如何使用具有完整 I/O 的子进程从另一个脚本执行 Python 脚本?
问题描述
我正在尝试使用此run.py
脚本来运行to_run.py
. 这些只是我用来学习的一些 POC 脚本。我期望完整的 I/O,但是,除了来自to_run.py
(即“HELLOO”)的第一行之外,我没有得到任何输出。我怎样才能解决这个问题?
运行.py
#!/usr/bin/python3
from subprocess import PIPE,Popen
from time import sleep
p = Popen(['./to_run.py'],stdin=PIPE,stdout=PIPE,stderr=PIPE)
while True:
out = p.stdout.readline().decode()
print(out)
print("Poll: " + str(p.poll()))
to_run.py
#!/usr/bin/python3
print("HELLOO")
l1 = input("Enter first line: ")
l2 = input("Enter second line: ")
print("First line: "+l1)
print("Second line: "+l2)
解决方案
问题是您将stdout
输出重定向to_run.py
到管道,但input
函数to_run.py
需要将其提示写入其中stdout
,这似乎是导致问题的原因。以下代码演示了这一点,并input
通过使用管道为函数提供输入来解决此问题stdin
。该communicate
方法用于发送输入。我还指定universal_newlines=True
了输出数据是不需要解码的字符串。
import subprocess
p = subprocess.Popen(['./to_run.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = p.communicate('Line 1\nLine 2\n')
print(stdout, end='')
更新
您可以不使用communicate
直接写入和读取管道来执行此操作,但文档有以下警告:
警告使用
communicate()
而不是.stdin.write
,.stdout.read
或.stderr.read
避免由于任何其他操作系统管道缓冲区填满并阻塞子进程而导致的死锁。
就“交互式”而言,我这样做的经验是您必须预先编写所有stdin
数据,因此我认为这不是特别交互式的。但至少您可以在生成输出时对其进行处理:
import subprocess
p = subprocess.Popen(['./to_run.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
p.stdin.write('Line 1\n')
p.stdin.write('Line 2\n')
p.stdin.close()
for out in iter(p.stdout.readline, ''): # read rest of output
print(out, end='')
p.stdout.close()
return_code = p.wait()
更新 2
原则上,您可以完全互动。但是您的问题to_run.py
是该函数input
在没有终止换行符的情况下写入其提示符,因此调用p.stdout.readline()
正在run.py
等待换行符。如果我们修改to_run.py
如下,那么一切都按预期工作:
to_run.py
#!/usr/bin/python3
import sys
print("HELLOO", flush=True)
print('Enter first line:', flush=True)
l1 = sys.stdin.readline()
print('Enter second line:', flush=True)
l2 = sys.stdin.readline()
print('First line:', l1, end='')
print('Second line:', l2, end='')
运行.py
import subprocess
cmd = ['./to_run.py']
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
print(p.stdout.readline(), end='') # HELLOO
print(p.stdout.readline(), end='') # Enter first line
p.stdin.write('Line 1\n')
p.stdin.flush()
print(p.stdout.readline(), end='') # Enter second line
p.stdin.write('Line 2\n')
p.stdin.flush()
p.stdin.close()
for out in iter(p.stdout.readline, ''):
print(out, end='')
p.stdout.close()
return_code = p.wait()
以下代码使用单独的线程从stdout
管道中读取似乎是处理您无法使用的情况所需要的readline
,就像使用input
函数时的情况一样。这里线程一次从一个字符读取字符stdout
并写入一个Queue
实例。主线程具有从队列中读取以模拟readline
的专用例程,以及read_prompt
寻找预期提示的特殊例程:
import subprocess
from threading import Thread
from queue import Queue
stdout = Queue()
def rdr_thread(pipe):
line = ''
while True:
buf = pipe.read(1)
if not buf:
stdout.put(None) # show end of file
return
stdout.put(buf[0])
def read_line():
line = ''
while True:
ch = stdout.get()
line += ch
if ch == '\n':
return line
def read_prompt():
line = ''
while True:
ch = stdout.get()
line += ch
if line[-2:] == ': ':
return line
def output_rest():
while True:
ch = stdout.get()
if ch is None:
return
print(ch, end='')
cmd = ['./to_run.py']
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
t = Thread(target=rdr_thread, args=(p.stdout,))
t.start()
print(read_line(), end='') # HELLOO
print(read_prompt()) # 'Enter first line: '
p.stdin.write('Line 1\n')
p.stdin.flush()
print(read_prompt()) # 'Enter second line: '
p.stdin.write('Line 2\n')
p.stdin.flush()
p.stdin.close()
output_rest()
p.stdout.close()
return_code = p.wait()
t.join()
使用超时的通用提示处理
import subprocess
from threading import Thread
from queue import Queue, Empty
stdout = Queue()
eof = False
def rdr_thread(pipe):
line = ''
while True:
buf = pipe.read(1)
if not buf:
stdout.put(None) # show end of file
eof = True
return
stdout.put(buf[0])
def read_prompt():
"""
read until there seems to be temporarilly no more output
"""
if eof:
return ''
line = ''
try:
while True:
ch = stdout.get(timeout=.5)
if ch is None:
break
line += ch
except Empty:
pass
return line
cmd = ['./to_run.py']
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
t = Thread(target=rdr_thread, args=(p.stdout,))
t.start()
print(read_prompt(), end='')
p.stdin.write('Line 1\n')
p.stdin.flush()
print(read_prompt(), end='')
p.stdin.write('Line 2\n')
p.stdin.flush()
p.stdin.close()
for chunk in iter(read_prompt, ''):
print(chunk, end='')
p.stdout.close()
return_code = p.wait()
t.join()
推荐阅读
- c# - 从包含事务的表中获取一个月内的日期列表
- c# - POST 方法总是返回 405 响应
- r - 在 Rmarkdown 文件中使用 googlesheets4 导入数据框
- python - Python - 大数据回归
- java - Maven依赖?不访问 servlet - 404 错误
- meson-build - 如何使用 -Og 为调试构建配置介子?
- javascript - 如何使用 i 标签作为输入标签?
- spring-boot - Docker容器问题
- ruby-on-rails - 如何在 Rails 应用中实现社交分享?
- .net-core - appsettings.json 中的自定义配置部分(数组)