首页 > 解决方案 > 扇出,多个广播,未知数量的接收器

问题描述

我需要一种方法来多次从一个主 goroutine 发出信号,未知数量的其他 goroutines。我还需要其他 goroutines 来select处理多个项目,所以忙等待(可能)不是一种选择。我想出了以下解决方案:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type signal struct {
    data     []int
    channels []chan struct{}
}

func newSignal() *signal {
    s := &signal{
        data:     make([]int, 0),
        channels: make([]chan struct{}, 1),
    }
    s.channels[0] = make(chan struct{})
    return s
}

func (s *signal) Broadcast(d int) {
    s.data = append(s.data, d)
    s.channels = append(s.channels, make(chan struct{}))
    close(s.channels[len(s.data)-1])
}

func test(s *signal, wg *sync.WaitGroup, id int, ctx context.Context) {
    for i := 0; ; i += 1 {
        select {
        case <-s.channels[i]:
            if id >= s.data[i] {
                fmt.Println("Goroutine completed:", id)
                wg.Done()
                return
            }
        case <-ctx.Done():
            fmt.Println("Goroutine completed:", id)
            wg.Done()
            return
        }
    }
}

func main() {
    s := newSignal()

    ctx, cancel := context.WithCancel(context.Background())
    wg := sync.WaitGroup{}
    wg.Add(3)
    go test(s, &wg, 3, ctx)
    go test(s, &wg, 2, ctx)
    go test(s, &wg, 1, ctx)

    s.Broadcast(3)
    time.Sleep(1 * time.Second)

    // multiple broadcasts is mandatory
    s.Broadcast(2)
    time.Sleep(1 * time.Second)

    // last goroutine
    cancel()

    wg.Wait()
}

游乐场: https: //play.golang.org/p/dGmlkTuj7Ty

有没有更优雅的方法来做到这一点?仅使用内置库的一种。如果不是,这是一个安全/可以使用的解决方案吗?我相信它至少是安全的,因为它适用于大量的 goroutines(我已经用它做了一些测试)。

简而言之,这正是我想要的:

在我的解决方案中,通道切片并不代表 goroutine:它们实际上代表正在广播的信号。这意味着如果我广播两次,然后一个 goroutine 启动,它将在select块中休眠之前检查两个信号。

标签: goparallel-processinggoroutine

解决方案


在我看来,您可能想要类似扇出模式的东西。这是描述扇入和扇出以及其他并发模式的一个来源。这是golang.org 上的一篇关于此的博客文章。我认为它本质上是一种使用通道的观察者模式。

基本上,你想要一些东西,比如说Broadcaster,保留一个频道列表。当您调用 时Broadcaster.send(data),它会遍历data在每个频道上发送的频道列表。Broadcaster还必须有一种方式让 goroutinessubscribeBroadcaster。Goroutines 必须有一种方法来接受来自 的通道Broadcaster或将通道提供给Broadcaster. 该通道是通信链路。

如果要在“观察者”goroutines 中执行的工作需要很长时间,请考虑使用缓冲通道,以免在 goroutinesBroadcaster期间阻塞send和等待。如果您不在乎 goroutine 是否错过了 a data,您可以使用非阻塞发送(见下文)。

当一个 goroutine “死亡”时,它可以unsubscribeBroadcaster它的列表中删除适当的通道。或者通道可以保持满,并且Broadcaster必须使用非阻塞发送来跳过满通道到死的 goroutine。

我不能说我所描述的内容是全面的或 100% 正确的。这只是对我根据您的问题陈述尝试的第一件事的快速描述。


推荐阅读