首页 > 解决方案 > 基本 goroutine 和通道模式:一个通道的多个 goroutine

问题描述

我是 Go 新手,想知道一些我无法弄清楚的非常基本的问题。

只是为了锻炼(真正需要的抽象),我需要:

下面是一个有效的代码。

毫不奇怪,程序的总时间总是或多或少等于 const MAX_SEC_SLEEP 的值,因为所有处理 goroutine 都是并行工作的。

但是关于:

  1. 接收部分:

我真的需要将我的 select 语句包装在一个 for 循环中,迭代 ITERATIONS 的确切数量,以获得与将结束通道的 goroutine 数量完全相同的接收器数量吗?这是避免死锁的唯一方法吗?如果由于某种原因,其中一个 goroutine 失败了怎么办?

我找不到一种方法来让一个简单的 for (ever) 循环包装选择,有两种情况(一种从结果通道接收,另一种case <-done从函数返回)。会是更好的模式吗?

还是迭代通道并检测它是否从某个地方关闭会更好?

  1. 发送部分

在所有迭代之后,我应该在某个地方关闭频道吗?但我肯定会在至少一个 gouroutine 完成之前关闭它,以恐慌错误结束(试图发送到关闭的频道)

如果我要插入一个done <- true模式,它会在这里吗?

  1. 等待组

我并没有真正尝试过等待组,我需要一种方法来从 goroutine 中捕获所有“返回”值并将它们附加到最后一个切片;除了使用通道之外,我没有找到从 goroutine 返回的正确方法。

  1. 杂项

我应该在 func 参数中传递通道还是让它们按原样全局传递给程序?

  1. (坏)代码
package main

import (
    "fmt"
    "log"
    "math/rand"
    "time"
)

const ITERATIONS = 200

var (
    results   chan string
    initial   []string
    formatted []string
)

func main() {
    defer timeTrack(time.Now(), "program")

    format()  //run format goroutines
    receive() //receive formatted strings

    log.Printf("final slice contains %d/%d elements", len(formatted), len(initial))
}

//gets all results from channel and appends them to formatted slice
func receive() {
    for i := 0; i < ITERATIONS; i++ {
        select {
        case result := <-results:
            formatted = append(formatted, result)
        }
    }
}

//loops over initial slice and runs a goroutine per element
//that does some formatting operation and then pushes result to channel
func format() {
    for i := 0; i < ITERATIONS; i++ {
        go func(i int) {
            //simulate some formatting code that can take a while
            sleep := time.Duration(rand.Intn(10)) * time.Second
            time.Sleep(sleep)
            //append formatted string to result chan
            results <- fmt.Sprintf("%s formatted", initial[i])
        }(i)
    }

}

//initialize chans and inital slice
func init() {
    results = make(chan string, ITERATIONS)
    for i := 0; i < ITERATIONS; i++ {
        initial = append(initial, fmt.Sprintf("string #%d", i))
    }
}

func timeTrack(start time.Time, name string) {
    elapsed := time.Since(start)
    log.Printf("%s took %s", name, elapsed)
}


标签: goconcurrencygoroutinechannels

解决方案


我找不到一种方法来让一个简单的 for (ever) 循环包装选择,有两种情况(一种从结果通道接收,另一种类似于 case <-done 将从函数返回)。会是更好的模式吗?

如果通道在所有编写器完成后关闭,您可以使用一个简单的for ... range循环:

for result := range ch {
    ... do something with the result ...
}

为了使这个简单的变体起作用,通道必须关闭,否则for循环将不会终止。

在所有迭代之后,我应该在某个地方关闭频道吗?

如果可能的话,是的。

我并没有真正尝试过等待组...

Async.WaitGroup或非常相似的东西,几乎可以肯定是去这里的方式。最初应该考虑每个可以写入通道的 goroutine,例如

var wg Sync.WaitGroup
wg.Add(ITERATIONS)

然后,您可以生成所有编写并让它们运行的​​ goroutine。每次运行时,它都会调用wg.Done()以指示它已完成。

那么你——某处;where部分有点棘手——调用以wg.Wait()等待所有编写器完成。当所有作者都表示完成时,您可以close()频道。

请注意,如果您wg.Wait()从正在读取通道的同一个goroutine 调用 - 即将运行for result := range ...循环的 goroutine - 您会遇到问题:您不能同时从通道读取等待写入者写入通道。所以你要么必须wg.Wait()在循环结束后调用,这为时已晚;或者在循环开始之前,这还为时过早。

这使问题及其解决方案变得清晰:您必须从一个 goroutine 中的通道读取,并在另一个 goroutine 中执行等待然后关闭。这些 goroutine 最多可以有一个是main最初进入函数的主要 goroutine。

将 wait-and-then-close goroutine 设为自己的私有 goroutine 往往非常简单:

go func() {
    wg.Wait()
    close(results)
}()

例如。

如果由于某种原因,其中一个 goroutine 失败了怎么办?

您需要在这里准确定义失败的含义,但是如果您的意思是:如果被调用的 goroutine 本身调用了,例如,panic因此没有wg.Done()调用它,您可以使用它defer来确保wg.Done()即使在恐慌时也会发生这种情况:

func(args) {
    defer wg.Done()
    ... do the work ...
}

wg.Add(1)
go func(args) // `func` will definitely call `wg.Done`, even if `func` panics

我应该在 func 参数中传递通道还是让它们按原样全局传递给程序?

从风格上讲,全局变量总是有点混乱。这并不意味着您不能使用它们;这取决于你,只要记住所有的权衡。闭包变量没有那么混乱,但请记住要小心for循环迭代变量:

for i := 0; i < 10; i++ {
    go func() {
        time.Sleep(50 * time.Millisecond)
        fmt.Println(i)  // BEWARE BUG: this prints 10, not 0-9
    }()
}

行为不端。 在 Go 操场上试试这个;请注意,go vet现在抱怨i这里的使用不当。

我将您的原始示例代码带到了 Go Playground,并如上所述对其进行了最小的更改。结果就在这里。(为了让它不那么慢,我让睡眠等待 n 秒而不是 n 秒。)


推荐阅读