go - 使用通道作为队列的死锁
问题描述
我正在学习 Go,我正在尝试实现一个作业队列。
我想做的是:
让主要的 goroutine 通过一个通道为多个解析器提供行(将一行解析为 s 结构),并让每个解析器将结构发送到其他工作器(goroutine)将处理的结构通道(发送到数据库等) .
代码如下所示:
lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)
fileName := "myfile.csv"
file, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := bufio.NewReader(file)
// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ {
go lineToStructWorker(i, lineParseQ, jobProcessQ)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
lineCount := 0
countSend := 0
for {
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF {
log.Fatal(err)
}
if err == io.EOF {
break
}
lineCount++
if lineCount > 1 {
countSend++
lineParseQ <- line[:len(line)-1] // Avoid last char '\n'
}
}
for i := 0; i < countSend; i++ {
fmt.Printf("Received %+v.\n", <-doneQ)
}
close(doneQ)
close(jobProcessQ)
close(lineParseQ)
这是一个简化的游乐场: https: //play.golang.org/p/yz84g6CJraa
工人看起来像这样:
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
}
我知道问题与“完成”通道有关,因为如果我不使用它,就没有错误,但我不知道如何解决它。
解决方案
在您完成将所有行发送到 之前,您不会开始阅读,这比缓冲区空间还多。因此,一旦缓冲区已满,发送块就会开始填充缓冲区,一旦已满,它就会死锁。将发送到的循环、从 读取的循环或两者都移动到单独的 goroutine 中,例如:doneQ
lineParseQ
doneQ
lineParseQ
lineParseQ
doneQ
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
最后这仍然会死锁,因为你在同一个 goroutine 中有一个range
over 一个 channel 和after it; close
因为range
一直持续到通道关闭,并且在range
完成后关闭,所以你仍然有一个僵局。你需要把关门放在合适的地方;也就是说,在发送例程中,或者WaitGroup
如果给定通道有多个发送者,则在监视发送例程时被阻塞。
// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
countSend := 0
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
go func() {
wg.Wait()
close(jobProcessQ)
}()
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
wg.Done()
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
close(done)
}
完整的工作示例: https: //play.golang.org/p/XsnewSZeb2X
推荐阅读
- keycloak - 从 Keycloak 帐户页面注销时更改 redirect_uri
- r - 如何计算温度这么冷以来的时间
- c++ - 无论是否为枚举,如何获取模板参数的整数类型
- python - 如何在单个 jupyter 单元格中显示多个 pandas describe() 输出?
- java - 如何从 Java 8 中的对象列表中获取两个属性的乘积
- php - Symfony Panther 中 getInternalResponse 的 getHeader 总是返回一个空数组
- python - 为 Dataframe.apply() 提供数据类型
- android - 什么时候会创建 Android 包私有目录?
- javascript - 验证不适用于材料 ui textinput
- c - 不完全确定我的回报做错了什么