首页 > 解决方案 > 无法使频道地图正常工作

问题描述

这可能是菜鸟的错误。我有一个带有字符串值和通道图的切片。对于切片中的每个字符串,都会创建一个通道并为其创建一个映射条目,其中字符串作为键。

我观看频道并将值传递给其中一个,但从未找到。

package main

import (
    "fmt"
    "time"
)

type TestStruct struct {
    Test string
}

var channelsMap map[string](chan *TestStruct)

func main() {
    stringsSlice := []string{"value1"}
    channelsMap := make(map[string](chan *TestStruct))

    for _, value := range stringsSlice {
        channelsMap[value] = make(chan *TestStruct, 1)

        go watchChannel(value)
    }

    <-time.After(3 * time.Second)

    testStruct := new(TestStruct)
    testStruct.Test = "Hello!"
    channelsMap["value1"] <- testStruct

    <-time.After(3 * time.Second)
    fmt.Println("Program ended")
}

func watchChannel(channelMapKey string) {
    fmt.Println("Watching channel: " + channelMapKey)

    for channelValue := range channelsMap[channelMapKey] {
        fmt.Printf("Channel '%s' used. Passed value: '%s'\n", channelMapKey, channelValue.Test)
    }

}

游乐场链接: https: //play.golang.org/p/IbucTqMjdGO

输出:

Watching channel: value1
Program ended

当消息被送入频道时,我如何执行某些操作?

标签: goslicechannel

解决方案


你的方法有很多问题。

第一个是您channelsMapmain函数中重新声明(“隐藏”)全局变量。(如果你至少完成了一些 最基本的 Go 介绍,你应该不会遇到这样的问题。)

这意味着您的watchChannel(实际上,执行该函数的所有 goroutines)channelsMap在您的main函数写入其 local时读取全局channelsMap

接下来发生的事情如下:

  1. range语句watchChannel有一个简单的地图查找表达式作为其源——<code>channelsMap[channelMapKey]。

    在 Go 中,这种形式的map 查找 永远不会失败,但是如果 map 没有这样的 key(或者如果 map 没有初始化,即是nil),则返回相应类型的所谓 “零值”

  2. 由于全局channelsMap始终为空,因此任何调用都会watchChannel执行映射查找,该查找始终返回 type 的零值chan *TestStruct。任何通道的零值为nil

  3. 在通道 上执行的range语句产生零次迭代。换句话说,循环总是执行零次。nilforwatchChannel

更复杂的问题仍然不是全局变量的阴影,而是 goroutine 之间完全没有同步。您正在使用“睡眠”作为一种创可贴,试图在 goroutine 之间执行隐式同步,但是虽然从所谓的“常识”来看,这似乎是可以的,但实际上对于两个人来说是行不通的原因:

  • 休眠始终是一种幼稚的同步方法,因为它仅取决于所有 goroutine 将相对自由且无竞争地运行这一事实。这在许多(如果不是大多数)生产环境中远非如此,因此始终是细微错误的原因。请不要再这样做了。
  • Go 内存模型中没有任何 内容表明运行时将等待挂钟时间视为确定不同 goroutine 的执行如何相互关联的顺序。

有多种方法可以在 goroutine 之间同步执行。基本上,它们相当于通过通道发送和接收,并使用sync包提供的类型。在您的特定情况下,最简单的方法可能是使用该sync.WaitGroup类型。

以下是解决上述问题后我们会遇到的情况: - 在 map 变量的定义处初始化它,不要在main. - 用于sync.WaitGroup正确main等待它生成的所有 goroutines 以表明它们已完成:

package main

import (
    "fmt"
    "sync"
)

type TestStruct struct {
    Test string
}

var channelsMap = make(map[string](chan *TestStruct))

func main() {
    stringsSlice := []string{"value1"}

    var wg sync.WaitGroup

    wg.Add(len(stringsSlice))
    for _, value := range stringsSlice {
        channelsMap[value] = make(chan *TestStruct, 1)

        go watchChannel(value, &wg)
    }

    testStruct := new(TestStruct)
    testStruct.Test = "Hello!"
    channelsMap["value1"] <- testStruct

    wg.Wait()
    fmt.Println("Program ended")
}

func watchChannel(channelMapKey string, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Watching channel: " + channelMapKey)

    for channelValue := range channelsMap[channelMapKey] {
        fmt.Printf("Channel '%s' used. Passed value: '%s'\n", channelMapKey, channelValue.Test)
    }

}

一旦我们修复了前两个问题,接下来的两个代码问题就变得很明显了——在你让“watcher”goroutines 使用与 goroutine running 相同的 map 变量main,并让后者正确等待 watchers 之后:

  • 在生成 watcher goroutine 的循环结束后更新 map 的代码和在所有 watcher goroutine 中访问该变量的代码之间存在 map 变量的数据竞争。for

  • 观察者 goroutine 和等待它们完成的主 goroutine 之间存在死锁。

    死锁的原因是 watcher goroutines 从来没有收到任何他们必须退出处理的信号,因此永远卡在试图从它们各自的通道中读取。

解决这两个新问题的方法很简单,但它们实际上可能会“打破”你最初构建代码的想法。

首先,我将通过简单地让观察者 不访问地图变量来消除数据竞争。如您所见,每次调用 watchChannel都会接收一个值,用作从共享映射中读取值的键,因此每个观察者在其运行时总是只读取一次值。如果我们完全删除这个额外的地图访问,代码会变得更加清晰,而是直接将适当的通道值传递给每个观察者。这样做的一个很好的副产品是我们不再需要全局映射变量。

是我们将得到的:

package main

import (
    "fmt"
    "sync"
)

type TestStruct struct {
    Test string
}

func main() {
    stringsSlice := []string{"value1"}
    channelsMap := make(map[string](chan *TestStruct))

    var wg sync.WaitGroup

    wg.Add(len(stringsSlice))
    for _, value := range stringsSlice {
        channelsMap[value] = make(chan *TestStruct, 1)

        go watchChannel(value, channelsMap[value], &wg)
    }

    testStruct := new(TestStruct)
    testStruct.Test = "Hello!"
    channelsMap["value1"] <- testStruct

    wg.Wait()
    fmt.Println("Program ended")
}

func watchChannel(channelMapKey string, ch <-chan *TestStruct, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Watching channel: " + channelMapKey)

    for channelValue := range ch {
        fmt.Printf("Channel '%s' used. Passed value: '%s'\n", channelMapKey, channelValue.Test)
    }

}

好吧,我们仍然有僵局。

有多种方法可以解决这个问题,但它们取决于实际情况,并且对于这个玩具示例,任何迭代至少其中一个子集的尝试只会使情况变得混乱。相反,让我们为这种情况使用最简单的方法:关闭通道会立即解除对它的任何挂起接收操作的阻塞,并为通道的类型生成零值。对于使用该语句迭代的通道,range它仅意味着语句终止,而不会从通道产生任何值。

换句话说,让我们关闭所有通道以解除阻塞range正在由 watcher goroutines 运行的语句,然后等待这些 goroutines 通过等待组报告它们的完成。

为了不让答案过长,我还添加了字符串切片的编程初始化,通过让多个观察者(而不仅仅是一个观察者)实际上做了有用的工作,从而使示例更有趣:

package main

import (
    "fmt"
    "sync"
)

type TestStruct struct {
    Test string
}

func main() {
    var stringsSlice []string
    channelsMap := make(map[string](chan *TestStruct))

    for i := 1; i <= 10; i++ {
        stringsSlice = append(stringsSlice, fmt.Sprintf("value%d", i))
    }

    var wg sync.WaitGroup

    wg.Add(len(stringsSlice))
    for _, value := range stringsSlice {
        channelsMap[value] = make(chan *TestStruct, 1)

        go watchChannel(value, channelsMap[value], &wg)
    }

    for _, value := range stringsSlice {
        testStruct := new(TestStruct)
        testStruct.Test = fmt.Sprint("Hello! ", value)
        channelsMap[value] <- testStruct
    }

    for _, ch := range channelsMap {
        close(ch)
    }

    wg.Wait()
    fmt.Println("Program ended")
}

func watchChannel(channelMapKey string, ch <-chan *TestStruct, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Watching channel: " + channelMapKey)

    for channelValue := range ch {
        fmt.Printf("Channel '%s' used. Passed value: '%s'\n", channelMapKey, channelValue.Test)
    }

}

游乐场链接


正如您所看到的,在开始使用并发之前,您实际上应该以更详细的方式了解一些事情。

我建议按以下顺序进行:

  1. Go 之旅会让你习惯于并发的基本原理。
  2. Go 编程语言有两章专门为读者提供使用通道和包中的类型处理并发的温和介绍sync
  3. Go 中的并发性继续展示更多关于如何处理 Go 中的并发性的核心细节,包括处理并发程序在生产中面临的现实问题的高级主题——例如限制传入请求的速率的方法。

推荐阅读