首页 > 解决方案 > 从管道读取的非阻塞方式

问题描述

我想创建一个简单的应用程序,它将连续读取一个应用程序的输出,对其进行处理并将处理后的输出写入标准输出。这个应用程序可以在一秒钟内产生大量数据,然后静默几分钟。

问题是我的数据处理算法很慢,所以主循环被阻塞了。当循环被阻塞时,我正在丢失一个即将到来的数据。

    cmd := exec.Command("someapp")
    stdoutPipe, _ := cmd.StdoutPipe()
    stdoutReader := bufio.NewReader(stdoutPipe)

    go func() {
        bufioReader := bufio.NewReader(stdoutReader)
        for {
            output, _, err := bufioReader.ReadLine()
            if err != nil || err == io.EOF {
                break
            }

            processedOutput := dataProcessor(output);

            fmt.Print(processedOutput)
        }
    }()

解决这个问题的最好方法可能是缓冲所有输出并在另一个 Goroutine 中处理它,但我不确定如何在 Golang 中实现它。解决这个问题最惯用的方法是什么?

标签: go

解决方案


你可以有两个 goroutine,一个是供应商,另一个是消费者。供应商执行命令并通过通道将数据传递给消费者。

    cmd := exec.Command("someapp")
    stdoutPipe, _ := cmd.StdoutPipe()
    stdoutReader := bufio.NewReader(stdoutPipe)
    Datas := make(chan Data, 100)
    go dataProcessor(Datas)
        bufioReader := bufio.NewReader(stdoutReader)
        for {
            output, _, err := bufioReader.ReadLine()
            var tempData Data
            tempData.Out = output
            if err != nil || err == io.EOF {
                break
            }
            Datas <- tempData
            }
    }

然后您将在 dataProcessor 函数中处理数据:

func dataProcessor(Datas <-chan Data)  {
    for  {
        select {
        case data := <-Datas:
        fmt.Println(data)
        default:
            continue
        }
    }
}

显然这是一个非常简单的示例,您应该对其进行自定义并使其变得更好。搜索有关 chanle 和 goroutin 的信息。阅读教程可能会有所帮助。


推荐阅读