首页 > 解决方案 > 是否可以在处理时将项目添加到 Go 通道?

问题描述

我正在尝试找到一种方法来使用 goroutine 递归地完成 Go 中的任务。该程序的目标是将输入元素放入通道并添加到输出通道输入 -1 直到达到 0。处理的工人数量应该是可适应的。我遵循的过程是这样的:

创建一个输入输出通道。将起始编号添加到输入通道。初始化workers以运行worker函数。循环并打印输出通道中的输出。

func main() {
    inputChannel := make(chan int, 1)
    outputChannel := make(chan int)
    inputChannel <- 100
    numWorkers := 4
    for i := 0; i < numWorkers; i++ {
        go worker(inputChannel, outputChannel)
    }
    for elem := range outputChannel {
        fmt.Println("Output: ", elem)
    }
}

接下来,在 order 函数中,我们遍历输入通道中的元素,每次检查是否有更多元素要接收。如果有更多元素要接收,我们打印输入元素,从元素中减去 1,如果元素大于 0,则发送到输入通道以供另一个工作人员拾取。如果输入通道中没有任何内容,那么我们返回。

func worker(input chan int, output chan<- int) {
    defer close(input)
    defer close(output)
    for {
        element, more := <-input
        if more {
            fmt.Println("Input: ", element)
            element--
            if element != 0 {
                input <- element
            }
        } else {
            fmt.Println("All Jobs Processed")
            return
        }
    }
}

我看到的输出是:

Input:  100
Input:  99
Input:  98
Input:  97
Input:  96
Input:  95
Input:  94
Input:  93
Input:  92
Input:  91
Input:  90
Input:  89
Input:  88
Input:  87
Input:  86
Input:  85
Input:  84
Input:  83
Input:  82
Input:  81
Input:  80
Input:  79
Input:  78
Input:  77
Input:  76
Input:  75
Input:  74
Input:  73
Input:  72
Input:  71
Input:  70
Input:  69
Input:  68
Input:  67
Input:  66
Input:  65
Input:  64
Input:  63
Input:  62
Input:  61
Input:  60
Input:  59
Input:  58
Input:  57
Input:  56
Input:  55
Input:  54
Input:  53
Input:  52
Input:  51
Input:  50
Input:  49
Input:  48
Input:  47
Input:  46
Input:  45
Input:  44
Input:  43
Input:  42
Input:  41
Input:  40
Input:  39
Input:  38
Input:  37
Input:  36
Input:  35
Input:  34
Input:  33
Input:  32
Input:  31
Input:  30
Input:  29
Input:  28
Input:  27
Input:  26
Input:  25
Input:  24
Input:  23
Input:  22
Input:  21
Input:  20
Input:  19
Input:  18
Input:  17
Input:  16
Input:  15
Input:  14
Input:  13
Input:  12
Input:  11
Input:  10
Input:  9
Input:  8
Input:  7
Input:  6
Input:  5
Input:  4
Input:  3
Input:  2
Input:  1
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:31 +0x179

goroutine 6 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4

goroutine 7 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4

goroutine 8 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4

goroutine 9 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4
exit status 2 

我已经以多种方式尝试了这一点,依赖于这样的渠道并使用等待组,但我似乎无法让流程处理所有项目并发出输出。

标签: go

解决方案


好的,我们开始吧。首先,请注意您的代码中存在一些问题。然后修复它们。

  • 正如Adrian所说,从已经关闭或没有元素的通道中读取。在您的工作人员功能中,您正在这样做。当您在另一个工作人员关闭输入通道后从输入通道读取元素时,就会发生这种情况。

    func worker(input chan int, output chan<- int) {
        defer close(input)
        ...
        for {
            element, more := <-input
            ...
        }
    }
    

    那么,为什么在所有工作人员完成后不关闭输入通道?

  • 在解决了输入通道的问题后,当您尝试从输出通道读取时,会出现另一个问题。此外,您不会在输出通道上发送任何内容。如果您不需要该频道,那么为什么要使用该频道。而且这个输出通道是无缓冲的(大小为0的通道和发送接收应该同时,否则会出现死锁情况)。看,从这里这里缓冲与无缓冲。也许网络上有更多有用的文档。感谢我的朋友Nightfury1204从他的这篇文章中获得了关于缓冲与非缓冲频道的第一个链接。

    outputChannel := make(chan int) // unbuffered, no size is defined
    ...
    for elem := range outputChannel {
        fmt.Println("Output: ", elem)
    }
    

    所以,如果你想向输出通道发送一些东西,那么逻辑是你自己的。例如,您可以在工作人员中完成输入通道处理后发送一些内容。在这种情况下,将您的输出通道声明为长度为 4 的缓冲通道(因为您正在运行 4 个工作人员)。完成所有工作人员后,关闭您的输出通道,然后阅读。

    outputChannel := make(chan int, 4) // buffered
    ...
    // after finishing all your workers
    close(outputChannel)
    for elem := range outputChannel {
        fmt.Println("Output: ", elem)
    }
    

需要注意的是,使用sync.WaitGroupfrom "sync"package 来等待一组 goroutine 完成。

请参见下面的示例:https: //play.golang.org/p/WAqwyR0ggNN

import "fmt"
import "sync"

func main() {
    inputChannel := make(chan int, 1)
    outputChannel := make(chan int, 4)

    var wg sync.WaitGroup
    wg.Add(4)

    inputChannel <- 100
    numWorkers := 4
    for i := 0; i < numWorkers; i++ {
        go func() {
            defer wg.Done()
            for {
                select {
                case element := <-inputChannel:
                    fmt.Println("Input: ", element)
                    element--
                    if element != 0 {
                        inputChannel <- element
                    }
                default:
                    outputChannel<-0
                    fmt.Println("All Jobs Processed", len(outputChannel))
                    return
                }
            }
        }()
    }
    wg.Wait()
    close(inputChannel)
    close(outputChannel)
    for elem := range outputChannel {
        fmt.Println("Output: ", elem)
    }
}

推荐阅读