go - 扇出,多个广播,未知数量的接收器
问题描述
我需要一种方法来多次从一个主 goroutine 发出信号,未知数量的其他 goroutines。我还需要其他 goroutines 来select
处理多个项目,所以忙等待(可能)不是一种选择。我想出了以下解决方案:
package main
import (
"context"
"fmt"
"sync"
"time"
)
type signal struct {
data []int
channels []chan struct{}
}
func newSignal() *signal {
s := &signal{
data: make([]int, 0),
channels: make([]chan struct{}, 1),
}
s.channels[0] = make(chan struct{})
return s
}
func (s *signal) Broadcast(d int) {
s.data = append(s.data, d)
s.channels = append(s.channels, make(chan struct{}))
close(s.channels[len(s.data)-1])
}
func test(s *signal, wg *sync.WaitGroup, id int, ctx context.Context) {
for i := 0; ; i += 1 {
select {
case <-s.channels[i]:
if id >= s.data[i] {
fmt.Println("Goroutine completed:", id)
wg.Done()
return
}
case <-ctx.Done():
fmt.Println("Goroutine completed:", id)
wg.Done()
return
}
}
}
func main() {
s := newSignal()
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(3)
go test(s, &wg, 3, ctx)
go test(s, &wg, 2, ctx)
go test(s, &wg, 1, ctx)
s.Broadcast(3)
time.Sleep(1 * time.Second)
// multiple broadcasts is mandatory
s.Broadcast(2)
time.Sleep(1 * time.Second)
// last goroutine
cancel()
wg.Wait()
}
游乐场: https: //play.golang.org/p/dGmlkTuj7Ty
有没有更优雅的方法来做到这一点?仅使用内置库的一种。如果不是,这是一个安全/可以使用的解决方案吗?我相信它至少是安全的,因为它适用于大量的 goroutines(我已经用它做了一些测试)。
简而言之,这正是我想要的:
- 主goroutine(调用它
M
)必须能够多次发送一些数据(调用它d
)一些未知数量的其他goroutine(调用它们)n
,0...n
每个goroutine根据每次执行一个d
动作 M
必须能够多次使用某些(数字)数据向所有其他goroutine 发出信号n
- 每个 goroutine
n
要么自行终止(基于上下文),要么在执行某些操作d
并决定其命运之后终止。它将执行此检查的次数与发出的信号一样多,直到它死亡。 - 我不允许以任何方式跟踪
n
goroutines(例如,拥有 goroutines 的通道图并进行迭代)
在我的解决方案中,通道切片并不代表 goroutine:它们实际上代表正在广播的信号。这意味着如果我广播两次,然后一个 goroutine 启动,它将在select
块中休眠之前检查两个信号。
解决方案
在我看来,您可能想要类似扇出模式的东西。这是描述扇入和扇出以及其他并发模式的一个来源。这是golang.org 上的一篇关于此的博客文章。我认为它本质上是一种使用通道的观察者模式。
基本上,你想要一些东西,比如说Broadcaster
,保留一个频道列表。当您调用 时Broadcaster.send(data)
,它会遍历data
在每个频道上发送的频道列表。Broadcaster
还必须有一种方式让 goroutinessubscribe
去Broadcaster
。Goroutines 必须有一种方法来接受来自 的通道Broadcaster
或将通道提供给Broadcaster
. 该通道是通信链路。
如果要在“观察者”goroutines 中执行的工作需要很长时间,请考虑使用缓冲通道,以免在 goroutinesBroadcaster
期间阻塞send
和等待。如果您不在乎 goroutine 是否错过了 a data
,您可以使用非阻塞发送(见下文)。
当一个 goroutine “死亡”时,它可以unsubscribe
从Broadcaster
它的列表中删除适当的通道。或者通道可以保持满,并且Broadcaster
必须使用非阻塞发送来跳过满通道到死的 goroutine。
我不能说我所描述的内容是全面的或 100% 正确的。这只是对我根据您的问题陈述尝试的第一件事的快速描述。
推荐阅读
- c# - 在 Microsoft BOT Emulator 中输入时如何获取输入字符?
- ansible - Ansible 条件检查失败
- r - 如何在ggplot2中使用geom_errorbar和facet_wrap
- sql - 运行 .sql 文件以提示输入用户名以在 oracle 数据库中解锁的可执行文件。最后需要关闭
- java - 从文件中读取 JSON 数据以在 StringEntity 中使用
- angular - SSRS ReportServer 返回 404,但仅在加载 Angular 站点之后
- c# - Azure AD Graph API - 将用户添加到应用程序获取 PlatformNotSupportedException
- python - Python:Json.load 给出列表并且无法从中解析数据
- node.js - 重建项目时使用 nanogen 的 EPERM 错误
- python - Python“dict”作为 ParDo 函数的状态/上下文