go - 是否可以在处理时将项目添加到 Go 通道?
问题描述
我正在尝试找到一种方法来使用 goroutine 递归地完成 Go 中的任务。该程序的目标是将输入元素放入通道并添加到输出通道输入 -1 直到达到 0。处理的工人数量应该是可适应的。我遵循的过程是这样的:
创建一个输入输出通道。将起始编号添加到输入通道。初始化workers以运行worker函数。循环并打印输出通道中的输出。
func main() {
inputChannel := make(chan int, 1)
outputChannel := make(chan int)
inputChannel <- 100
numWorkers := 4
for i := 0; i < numWorkers; i++ {
go worker(inputChannel, outputChannel)
}
for elem := range outputChannel {
fmt.Println("Output: ", elem)
}
}
接下来,在 order 函数中,我们遍历输入通道中的元素,每次检查是否有更多元素要接收。如果有更多元素要接收,我们打印输入元素,从元素中减去 1,如果元素大于 0,则发送到输入通道以供另一个工作人员拾取。如果输入通道中没有任何内容,那么我们返回。
func worker(input chan int, output chan<- int) {
defer close(input)
defer close(output)
for {
element, more := <-input
if more {
fmt.Println("Input: ", element)
element--
if element != 0 {
input <- element
}
} else {
fmt.Println("All Jobs Processed")
return
}
}
}
我看到的输出是:
Input: 100
Input: 99
Input: 98
Input: 97
Input: 96
Input: 95
Input: 94
Input: 93
Input: 92
Input: 91
Input: 90
Input: 89
Input: 88
Input: 87
Input: 86
Input: 85
Input: 84
Input: 83
Input: 82
Input: 81
Input: 80
Input: 79
Input: 78
Input: 77
Input: 76
Input: 75
Input: 74
Input: 73
Input: 72
Input: 71
Input: 70
Input: 69
Input: 68
Input: 67
Input: 66
Input: 65
Input: 64
Input: 63
Input: 62
Input: 61
Input: 60
Input: 59
Input: 58
Input: 57
Input: 56
Input: 55
Input: 54
Input: 53
Input: 52
Input: 51
Input: 50
Input: 49
Input: 48
Input: 47
Input: 46
Input: 45
Input: 44
Input: 43
Input: 42
Input: 41
Input: 40
Input: 39
Input: 38
Input: 37
Input: 36
Input: 35
Input: 34
Input: 33
Input: 32
Input: 31
Input: 30
Input: 29
Input: 28
Input: 27
Input: 26
Input: 25
Input: 24
Input: 23
Input: 22
Input: 21
Input: 20
Input: 19
Input: 18
Input: 17
Input: 16
Input: 15
Input: 14
Input: 13
Input: 12
Input: 11
Input: 10
Input: 9
Input: 8
Input: 7
Input: 6
Input: 5
Input: 4
Input: 3
Input: 2
Input: 1
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:31 +0x179
goroutine 6 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4
goroutine 7 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4
goroutine 8 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4
goroutine 9 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
/Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4
exit status 2
我已经以多种方式尝试了这一点,依赖于这样的渠道并使用等待组,但我似乎无法让流程处理所有项目并发出输出。
解决方案
好的,我们开始吧。首先,请注意您的代码中存在一些问题。然后修复它们。
正如Adrian所说,从已经关闭或没有元素的通道中读取。在您的工作人员功能中,您正在这样做。当您在另一个工作人员关闭输入通道后从输入通道读取元素时,就会发生这种情况。
func worker(input chan int, output chan<- int) { defer close(input) ... for { element, more := <-input ... } }
那么,为什么在所有工作人员完成后不关闭输入通道?
在解决了输入通道的问题后,当您尝试从输出通道读取时,会出现另一个问题。此外,您不会在输出通道上发送任何内容。如果您不需要该频道,那么为什么要使用该频道。而且这个输出通道是无缓冲的(大小为0的通道和发送接收应该同时,否则会出现死锁情况)。看,从这里和这里缓冲与无缓冲。也许网络上有更多有用的文档。感谢我的朋友Nightfury1204从他的这篇文章中获得了关于缓冲与非缓冲频道的第一个链接。
outputChannel := make(chan int) // unbuffered, no size is defined ... for elem := range outputChannel { fmt.Println("Output: ", elem) }
所以,如果你想向输出通道发送一些东西,那么逻辑是你自己的。例如,您可以在工作人员中完成输入通道处理后发送一些内容。在这种情况下,将您的输出通道声明为长度为 4 的缓冲通道(因为您正在运行 4 个工作人员)。完成所有工作人员后,关闭您的输出通道,然后阅读。
outputChannel := make(chan int, 4) // buffered ... // after finishing all your workers close(outputChannel) for elem := range outputChannel { fmt.Println("Output: ", elem) }
需要注意的是,使用
sync.WaitGroup
from"sync"
package 来等待一组 goroutine 完成。
请参见下面的示例:https: //play.golang.org/p/WAqwyR0ggNN
import "fmt"
import "sync"
func main() {
inputChannel := make(chan int, 1)
outputChannel := make(chan int, 4)
var wg sync.WaitGroup
wg.Add(4)
inputChannel <- 100
numWorkers := 4
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
for {
select {
case element := <-inputChannel:
fmt.Println("Input: ", element)
element--
if element != 0 {
inputChannel <- element
}
default:
outputChannel<-0
fmt.Println("All Jobs Processed", len(outputChannel))
return
}
}
}()
}
wg.Wait()
close(inputChannel)
close(outputChannel)
for elem := range outputChannel {
fmt.Println("Output: ", elem)
}
}
推荐阅读
- c# - ConnectionString 属性未初始化错误
- laravel - Laravel api 头请求
- python - 将 map lambda 转换为列表理解
- excel - 在PowerPoint中使用VBA粘贴excel图表并保持源格式
- php - 如何缓存 rand() 的结果?
- spark-streaming - NiFi & Spark 集成错误:java.lang.NoClassDefFoundError: org/apache/http/nio/protocol/HttpAsyncResponseConsumer
- reactjs - es6 javascript箭头函数
- kubernetes - 如何在 Kubernetes 中从 pod 运行命令到主机
- mysql - sql优化:通过子查询或自己的查询计算所有行/其他改进
- python - 从熊猫中的2个数据帧打印所有出现的映射数据