首页 > 解决方案 > 如何使用具有完整 I/O 的子进程从另一个脚本执行 Python 脚本?

问题描述

我正在尝试使用此run.py脚本来运行to_run.py. 这些只是我用来学习的一些 POC 脚本。我期望完整的 I/O,但是,除了来自to_run.py(即“HELLOO”)的第一行之外,我没有得到任何输出。我怎样才能解决这个问题?

运行.py

#!/usr/bin/python3

from subprocess import PIPE,Popen
from time import sleep

p = Popen(['./to_run.py'],stdin=PIPE,stdout=PIPE,stderr=PIPE)

while True:
    out = p.stdout.readline().decode()
    print(out)
    print("Poll: " + str(p.poll()))

to_run.py

#!/usr/bin/python3

print("HELLOO")

l1 = input("Enter first line: ")
l2 = input("Enter second line: ")

print("First line: "+l1)
print("Second line: "+l2)

标签: pythonpython-3.xsubprocess

解决方案


问题是您将stdout输出重定向to_run.py到管道,但input函数to_run.py需要将其提示写入其中stdout,这似乎是导致问题的原因。以下代码演示了这一点,并input通过使用管道为函数提供输入来解决此问题stdin。该communicate方法用于发送输入。我还指定universal_newlines=True了输出数据是不需要解码的字符串。

import subprocess


p = subprocess.Popen(['./to_run.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = p.communicate('Line 1\nLine 2\n')
print(stdout, end='')

更新

您可以不使用communicate直接写入和读取管道来执行此操作,但文档有以下警告:

警告使用communicate()而不是.stdin.write.stdout.read.stderr.read避免由于任何其他操作系统管道缓冲区填满并阻塞子进程而导致的死锁。

就“交互式”而言,我这样做的经验是您必须预先编写所有stdin数据,因此我认为这不是特别交互式的。但至少您可以在生成输出时对其进行处理:

import subprocess


p = subprocess.Popen(['./to_run.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
p.stdin.write('Line 1\n')
p.stdin.write('Line 2\n')
p.stdin.close()
for out in iter(p.stdout.readline, ''): # read rest of output
    print(out, end='')
p.stdout.close()
return_code = p.wait()

更新 2

原则上,您可以完全互动。但是您的问题to_run.py是该函数input在没有终止换行符的情况下写入其提示符,因此调用p.stdout.readline()正在run.py等待换行符。如果我们修改to_run.py如下,那么一切都按预期工作:

to_run.py

#!/usr/bin/python3


import sys

print("HELLOO", flush=True)

print('Enter first line:', flush=True)
l1 = sys.stdin.readline()
print('Enter second line:', flush=True)
l2 = sys.stdin.readline()

print('First line:', l1, end='')
print('Second line:', l2, end='')

运行.py

import subprocess


cmd = ['./to_run.py']
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
print(p.stdout.readline(), end='') # HELLOO
print(p.stdout.readline(), end='') # Enter first line
p.stdin.write('Line 1\n')
p.stdin.flush()
print(p.stdout.readline(), end='') # Enter second line
p.stdin.write('Line 2\n')
p.stdin.flush()
p.stdin.close()
for out in iter(p.stdout.readline, ''):
    print(out, end='')
p.stdout.close()
return_code = p.wait()

以下代码使用单独的线程从stdout管道中读取似乎是处理您无法使用的情况所需要的readline,就像使用input函数时的情况一样。这里线程一次从一个字符读取字符stdout并写入一个Queue实例。主线程具有从队列中读取以模拟readline的专用例程,以及read_prompt寻找预期提示的特殊例程:

import subprocess
from threading import Thread
from queue import Queue

stdout = Queue()

def rdr_thread(pipe):
    line = ''
    while True:
        buf = pipe.read(1)
        if not buf:
            stdout.put(None) # show end of file
            return
        stdout.put(buf[0])
        
def read_line():
    line = ''
    while True:
        ch = stdout.get()
        line += ch
        if ch == '\n':
            return line

def read_prompt():
    line = ''
    while True:
        ch = stdout.get()
        line += ch
        if line[-2:] == ': ':
            return line

def output_rest():
    while True:
        ch = stdout.get()
        if ch is None:
            return
        print(ch, end='')
    

cmd = ['./to_run.py']
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
t = Thread(target=rdr_thread, args=(p.stdout,))
t.start()
print(read_line(), end='') # HELLOO
print(read_prompt()) # 'Enter first line: '
p.stdin.write('Line 1\n')
p.stdin.flush()
print(read_prompt()) # 'Enter second line: '
p.stdin.write('Line 2\n')
p.stdin.flush()
p.stdin.close()
output_rest()
p.stdout.close()
return_code = p.wait()
t.join()

使用超时的通用提示处理

import subprocess
from threading import Thread
from queue import Queue, Empty

stdout = Queue()

eof = False

def rdr_thread(pipe):
    line = ''
    while True:
        buf = pipe.read(1)
        if not buf:
            stdout.put(None) # show end of file
            eof = True
            return
        stdout.put(buf[0])
        

def read_prompt():
    """
       read until there seems to be temporarilly no more output
    """
    if eof:
        return ''
    line = ''
    try:
        while True:           
            ch = stdout.get(timeout=.5)
            if ch is None:
                break
            line += ch
    except Empty:
        pass
    return line

    

cmd = ['./to_run.py']
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
t = Thread(target=rdr_thread, args=(p.stdout,))
t.start()
print(read_prompt(), end='')
p.stdin.write('Line 1\n')
p.stdin.flush()
print(read_prompt(), end='')
p.stdin.write('Line 2\n')
p.stdin.flush()
p.stdin.close()
for chunk in iter(read_prompt, ''):
    print(chunk, end='')
p.stdout.close()
return_code = p.wait()
t.join()

推荐阅读