go - 从管道读取的非阻塞方式
问题描述
我想创建一个简单的应用程序,它将连续读取一个应用程序的输出,对其进行处理并将处理后的输出写入标准输出。这个应用程序可以在一秒钟内产生大量数据,然后静默几分钟。
问题是我的数据处理算法很慢,所以主循环被阻塞了。当循环被阻塞时,我正在丢失一个即将到来的数据。
cmd := exec.Command("someapp")
stdoutPipe, _ := cmd.StdoutPipe()
stdoutReader := bufio.NewReader(stdoutPipe)
go func() {
bufioReader := bufio.NewReader(stdoutReader)
for {
output, _, err := bufioReader.ReadLine()
if err != nil || err == io.EOF {
break
}
processedOutput := dataProcessor(output);
fmt.Print(processedOutput)
}
}()
解决这个问题的最好方法可能是缓冲所有输出并在另一个 Goroutine 中处理它,但我不确定如何在 Golang 中实现它。解决这个问题最惯用的方法是什么?
解决方案
你可以有两个 goroutine,一个是供应商,另一个是消费者。供应商执行命令并通过通道将数据传递给消费者。
cmd := exec.Command("someapp")
stdoutPipe, _ := cmd.StdoutPipe()
stdoutReader := bufio.NewReader(stdoutPipe)
Datas := make(chan Data, 100)
go dataProcessor(Datas)
bufioReader := bufio.NewReader(stdoutReader)
for {
output, _, err := bufioReader.ReadLine()
var tempData Data
tempData.Out = output
if err != nil || err == io.EOF {
break
}
Datas <- tempData
}
}
然后您将在 dataProcessor 函数中处理数据:
func dataProcessor(Datas <-chan Data) {
for {
select {
case data := <-Datas:
fmt.Println(data)
default:
continue
}
}
}
显然这是一个非常简单的示例,您应该对其进行自定义并使其变得更好。搜索有关 chanle 和 goroutin 的信息。阅读本教程可能会有所帮助。
推荐阅读
- mysql - error of 1824: failed to open the referenced table
- oracle - 无法从其他用户登录到 Oracle 数据库
- python - 将列表和元组列表转换为字典
- python - 独特图形轮廓的权重图
- redis - 如何使用 C4droid 在 Android 上编译 Redis?
- azure-timeseries-insights - 如何为时序洞察创建简单模型以按设备分离传感器
- json - 数据加载到视图SwiftUI后执行函数
- windows - Register-ScheduledTask:访问被拒绝。(HRESULT 0x80070005)
- python - Python:如何实现对朴素贝叶斯分类器的互信息特征选择
- asp.net-core - dotnet publish 间歇性失败,因为 DLL 正在使用中,即使在回收应用程序池之后也是如此