首页 > 解决方案 > 终止从 UNIX 命名管道打开输入流时阻塞的线程

问题描述

我正在使用 java IO 流从 Unix 命名管道中读取命令。该程序有一个线程,它创建一个命名管道,mkfifo然后监听它,允许用户使用echo "command" > pipe.
它在大多数情况下都可以正常工作,除非我的程序必须在没有收到任何命令的情况下终止(例如在不可恢复的异常上)。
如此处所述:无法从 Java中的命名管道读取FileInputStream构造函数阻塞,直到其他进程打开管道进行写入。这使线程实现没有机会处理中断并正确终止。

我制作了一个简单的程序来展示我的问题。它侦听输入命令 3 秒,然后尝试(但失败)自行停止。

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Consumer;

/**
 * Read the input named FIFO to dispatch commands to a command processor.
 */
public final class InputReaderThread {

    private static final long LOOP_PERIOD_MS = 100;

    private final Path fifoPath;

    private final Consumer<String[]> processor;

    private final Thread thread;

    /**
     * Constructor.
     *
     * @param fifoPath path to the named FIFO
     * @param processor the command processor
     */
    public InputReaderThread(final Path fifoPath, final Consumer<String[]> processor) {
        this.fifoPath = fifoPath;
        this.processor = processor;

        this.thread = new Thread(this::run);
        this.thread.setName(InputReaderThread.class.getSimpleName());
    }

    /**
     * Start the input reader thread.<br>
     * It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution.
     *
     * @see Thread#start()
     */
    public final void start() {
        System.out.println("Requesting input reader thread start");
        this.thread.start();
    }

    /**
     * Request the input reader thread to stop.<br>
     * This method does not wait for the thread to terminate, see {@link #join(long)}.
     */
    public final void stop() {
        System.out.println("Requesting input reader thread stop");
        this.thread.interrupt();
    }

    /**
     * Wait for the internal reader thread to terminate.<br>
     * Throws an {@link InterruptedException} on timeout.
     *
     * @param timeout the maximum time to wait, in milliseconds
     * @return {@code true} if the input thread terminated within timeout
     * @throws InterruptedException the current thread was interrupted while waiting, or the timeout was reached
     * @see Thread#join(long)
     */
    public final boolean join(final long timeout) throws InterruptedException {
        System.out.println("Awaiting input reader thread stop");
        this.thread.join(timeout);
        return !this.thread.isAlive();
    }

    private final void run() {
        System.out.println("Input reader thread started");

        try (final FileInputStream is = new FileInputStream(createFifoPipe(this.fifoPath))) {
            final StringBuilder commandBuilder = new StringBuilder();

            System.out.println("Listening to input FIFO");
            while (!Thread.interrupted()) {
                // Avoid reading when there is no data available
                if (is.available() > 0) {
                    final int b = is.read();

                    if (b == '\n') {
                        // The command is complete: process it
                        final String command = commandBuilder.toString().trim();
                        System.out.println("Received command: " + command);
                        this.processor.accept(command.split(" "));

                        // Reset the command builder
                        commandBuilder.setLength(0);
                    } else {
                        // Append the character to the command
                        commandBuilder.append((char) b);
                    }
                } else {
                    // Poll the input FIFO periodically
                    Thread.sleep(LOOP_PERIOD_MS);
                }
            }
        } catch (final IOException e) {
            throw new RuntimeException("An IO exception occurred on agent FIFO", e);
        } catch (@SuppressWarnings("unused") final InterruptedException e) {
            // Handle interruption by terminating the thread
        }

        System.out.println("Input reader thread terminated");
    }

    /**
     * Helper method to create a Unix named FIFO.
     *
     * @param fifoPath the FIFO path
     * @return the File handler to the created FIFO
     * @throws IOException an I/O error occurs
     * @throws InterruptedException the thread was interrupted while waiting for the FIFO creation
     */
    private static final File createFifoPipe(final Path fifoPath) throws IOException, InterruptedException {
        System.out.println("Creating fifo: " + fifoPath);

        final File fifo = fifoPath.toFile();
        if (fifo.exists()) {
            System.err.println("Deleting existing fifo");
            Files.delete(fifoPath);
        }

        final String[] command = new String[] { "mkfifo", fifo.getAbsolutePath() };
        final Process process = new ProcessBuilder(command).start();

        final int returnStatus = process.waitFor();
        if (returnStatus != 0) {
            throw new IOException("Failed to create fifo: " + returnStatus);
        } else {
            System.out.println("Created fifo: " + fifoPath);
        }

        return fifo;
    }
}

NamedPipeTest.java

import java.nio.file.Paths;

/**
 * Dummy program for {@link InputReaderThread}.
 */
public final class NamedPipeTest {

    /**
     * Entry point.
     *
     * @param args one argument (path to named pipe)
     */
    public static void main(final String[] args) {
        final NamedPipeTest fifoTest = new NamedPipeTest(args[0]);
        fifoTest.start();

        try {
            Thread.sleep(3000);
            fifoTest.stop();
        } catch (final InterruptedException e) {
            System.err.println("Main thread interrupted");
            e.printStackTrace(System.err);
        }
    }

    private final InputReaderThread inputReader;

    /**
     * Constructor.
     *
     * @param fifoPath the named pipe path
     */
    public NamedPipeTest(final String fifoPath) {
        this.inputReader = new InputReaderThread(Paths.get(fifoPath), this::process);
    }

    /**
     * Start the program.
     */
    public void start() {
        this.inputReader.start();
    }

    /**
     * Stop the program.
     */
    public void stop() {
        try {
            this.inputReader.stop();
            if (!this.inputReader.join(5000)) {
                System.err.println("Failed to terminate input reader thread");
                System.exit(-1);
            }
        } catch (@SuppressWarnings("unused") final InterruptedException e) {
            System.err.println("Interrupted while stopping");
            System.exit(-1);
        }
        System.out.println("Stopped successfully");
    }

    /**
     * Process a command.
     *
     * @param command the command
     */
    public void process(final String[] command) {
        if (command[0].equals("stop")) {
            new Thread(this::stop).start();
        }
    }
}

运行这个在命名管道中写入“stop”的程序显示输入读取器线程在处理中断时正确终止(不确定为什么线程终止被记录两次,但这将是另一个问题):

$ (java -jar NamedPipeTest.jar fifo &) && sleep 2s && echo "stop" > fifo
Requesting input reader thread start
Input reader thread started
Creating fifo: fifo
Deleting existing fifo
Created fifo: fifo
Listening to input FIFO
Received command: stop
Requesting input reader thread stop
Awaiting input reader thread stop
Input reader thread terminated
Stopped successfully
Requesting input reader thread stop
Awaiting input reader thread stop
Stopped successfully

但是,在不编写“停止”命令的情况下运行它会表现出在FileInputStream打开管道时无法处理中断:

$ java -jar NamedPipeTest.jar fifo &
Requesting input reader thread start
Input reader thread started
Creating fifo: fifo
Deleting existing fifo
Created fifo: fifo
Requesting input reader thread stop
Awaiting input reader thread stop
Failed to terminate input reader thread

我想到的事情:

我不知道如何解决这个问题,我错过了什么吗?谢谢您的帮助!

标签: javaunixnamed-pipes

解决方案


推荐阅读