首页 > 解决方案 > 使用通道作为队列的死锁

问题描述

我正在学习 Go,我正在尝试实现一个作业队列。

我想做的是:

让主要的 goroutine 通过一个通道为多个解析器提供行(将一行解析为 s 结构),并让每个解析器将结构发送到其他工作器(goroutine)将处理的结构通道(发送到数据库等) .

代码如下所示:

lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)

fileName := "myfile.csv"

file, err := os.Open(fileName)
if err != nil {
    log.Fatal(err)
}

defer file.Close()

reader := bufio.NewReader(file)

// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ {
    go lineToStructWorker(i, lineParseQ, jobProcessQ)
}

// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
    go WorkerProcessStruct(i, jobProcessQ, doneQ)
}

lineCount := 0 
countSend := 0

for {
    line, err := reader.ReadString('\n')
    
    if err != nil && err != io.EOF {
        log.Fatal(err)
    }
    
    if err == io.EOF {
        break
    }
    
    lineCount++
    
    if lineCount > 1 {
        countSend++
        lineParseQ <- line[:len(line)-1]    // Avoid last char '\n'
    }

}

for i := 0; i < countSend; i++ {
    fmt.Printf("Received %+v.\n", <-doneQ)
}

close(doneQ)
close(jobProcessQ)
close(lineParseQ)

这是一个简化的游乐场: https: //play.golang.org/p/yz84g6CJraa

工人看起来像这样:

func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {

    for j := range lineQ {
        strQ <- lineToStruct(j) // just parses the csv to a struct...
    }

}

func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {

    for a := range strQ {
        time.Sleep(time.Millisecond * 500) // fake long operation...
        done <- a
    }
}

我知道问题与“完成”通道有关,因为如果我不使用它,就没有错误,但我不知道如何解决它。

标签: godeadlockjobschannelworker

解决方案


在您完成将所有行发送到 之前,您不会开始阅读,这比缓冲区空间还多。因此,一旦缓冲区已满,发送块就会开始填充缓冲区,一旦已满,它就会死锁。将发送到的循环、从 读取的循环或两者都移动到单独的 goroutine 中,例如:doneQlineParseQdoneQlineParseQlineParseQdoneQ

go func() {
    for _, line := range lines {
        countSend++
        lineParseQ <- line
    }
    close(lineParseQ)
}()

最后这仍然会死锁,因为你在同一个 goroutine 中有一个rangeover 一个 channel 和after it; close因为range一直持续到通道关闭,并且在range完成后关闭,所以你仍然有一个僵局。你需要把关门放在合适的地方;也就是说,在发送例程中,或者WaitGroup如果给定通道有多个发送者,则在监视发送例程时被阻塞。

// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
    wg.Add(1)
    go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}

// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
    go WorkerProcessStruct(i, jobProcessQ, doneQ)
}

countSend := 0

go func() {
    for _, line := range lines {
        countSend++
        lineParseQ <- line
    }
    close(lineParseQ)
}()

go func() {
    wg.Wait()
    close(jobProcessQ)
}()

for a := range doneQ {
    fmt.Printf("Received %v.\n", a)
}

// ...

func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
    for j := range lineQ {
        strQ <- lineToStruct(j) // just parses the csv to a struct...
    }
    wg.Done()
}

func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
    for a := range strQ {
        time.Sleep(time.Millisecond * 500) // fake long operation...
        done <- a
    }
    close(done)
}

完整的工作示例: https: //play.golang.org/p/XsnewSZeb2X


推荐阅读