go - 如何在此 Go 频道为空之前取消订阅?
问题描述
我不明白为什么这段代码不起作用:
游乐场 REPL: https: //play.golang.org/p/4PrKFnaTeKp
2009/11/10 23:00:01 published message to 0 subscribers
2009/11/10 23:00:01 published message to 0 subscribers
2009/11/10 23:00:02 client 1 connected
2009/11/10 23:00:02 published message to 1 subscribers
2009/11/10 23:00:02 message received: a message for 1
2009/11/10 23:00:02 receivedMsgs: 1
2009/11/10 23:00:02 message received: a message for all
2009/11/10 23:00:02 receivedMsgs: 2
2009/11/10 23:00:02 published message to 1 subscribers
2009/11/10 23:00:03 published message to 1 subscribers
2009/11/10 23:00:03 message received: a message for 1
2009/11/10 23:00:03 receivedMsgs: 3
2009/11/10 23:00:03 message received: a message for all
2009/11/10 23:00:03 receivedMsgs: 4
2009/11/10 23:00:03 published message to 1 subscribers
2009/11/10 23:00:04 published message to 1 subscribers
2009/11/10 23:00:04 message received: a message for 1
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main.main()
/tmp/sandbox948233627/prog.go:70 +0xfb
goroutine 6 [chan send]:
main.(*Broker).Publish(0xc000010240, 0x4c9815, 0x11, 0x0, 0x0)
/tmp/sandbox948233627/prog.go:121 +0x2af
main.main.func1(0xc000010240)
/tmp/sandbox948233627/prog.go:34 +0x91
created by main.main
/tmp/sandbox948233627/prog.go:30 +0xc7
goroutine 7 [semacquire]:
sync.runtime_SemacquireMutex(0xc000018054, 0x0, 0x1)
/usr/local/go-faketime/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc000018050)
/usr/local/go-faketime/src/sync/mutex.go:138 +0x105
sync.(*Mutex).Lock(...)
/usr/local/go-faketime/src/sync/mutex.go:81
main.(*Broker).Unsubscribe(0xc000010240, 0xc000062060)
/tmp/sandbox948233627/prog.go:93 +0x1c5
main.main.func2(0xc000010240)
/tmp/sandbox948233627/prog.go:61 +0x1a5
created by main.main
/tmp/sandbox948233627/prog.go:40 +0xf6
package main
import (
"fmt"
"log"
"sync"
"time"
)
type Event struct {
Message string
Consumer string
}
func NewEvent(msg string, consumer string) Event {
return Event{
Message: msg,
Consumer: consumer,
}
}
type Broker struct {
consumers map[chan Event]string
mtx *sync.Mutex
}
func main() {
broker := NewBroker()
go func() {
for {
time.Sleep(time.Second * 1)
broker.Publish(NewEvent("a message for 1", "1"))
broker.Publish(NewEvent("a message for all", ""))
}
}()
time.Sleep(2 * time.Second)
go func() {
ch := broker.Subscribe("1")
receivedMsgs := 0
for {
msg := <-ch
//---> Here I'm sending message to client's browser
//if _, err := w.Write([]byte(fmt.Sprintf("data: %s\n\n", msg))); err != nil {
// log.Println(err)
// return
//}
//---> Here I unsubscribe if error in w.Flush() AKA browser was closed
//if err := w.Flush(); err != nil {
//log.Println("browser closed")
//broker.Unsubscribe(ch)
//return
//}
log.Println("message received:", msg.Message)
if receivedMsgs > 3 {
broker.Unsubscribe(ch)
break
}
receivedMsgs++
log.Println("receivedMsgs:", receivedMsgs)
}
}()
select {}
}
func NewBroker() *Broker {
return &Broker{
consumers: make(map[chan Event]string),
mtx: new(sync.Mutex),
}
}
func (b *Broker) Subscribe(id string) chan Event {
b.mtx.Lock()
defer b.mtx.Unlock()
c := make(chan Event)
b.consumers[c] = id
log.Println(fmt.Sprintf("client %s connected", id))
return c
}
func (b *Broker) Unsubscribe(c chan Event) {
b.mtx.Lock()
defer b.mtx.Unlock()
id := b.consumers[c]
close(c)
delete(b.consumers, c)
log.Printf("client %s killed, %d remaining\n", id, len(b.consumers))
}
func (b *Broker) Publish(e Event) {
b.mtx.Lock()
defer b.mtx.Unlock()
pubMsg := 0
for s, id := range b.consumers {
if e.Consumer != "" {
// Push to specific consumer
if id == e.Consumer {
s <- e
pubMsg++
break
}
} else {
// Push to every consumer
e.Consumer = id
s <- e
// Reset unused consumer
e.Consumer = ""
pubMsg++
}
}
log.Printf("published message to %d subscribers\n", pubMsg)
}
启动时:
- 它等待 2 秒
- 订阅经纪人和
- 开始接收消息
- 在第三条消息之后,我想打破无限
for
,因此go func
但我得到一个错误:
如果我改变这一行:
if receivedMsgs > 2 {
至
if receivedMsgs > 3 {
有用。
我认为问题是因为当我打电话时break
,频道中仍然有一条消息ch
。
我对吗?
如何解决这个问题?
我受到https://gist.github.com/maestre3d/4a42e8fa552694f7c97c4811ce913e23 的启发。
解决方案
问题是空选择。根据这个网站在此处输入链接描述,当使用空选择时,选择语句将永远阻塞,因为没有 goroutine 可用于提供任何数据。
为了解决这个问题,我添加了一个带有取消的上下文,这将允许我在取消订阅时停止底部选择并停止第一个 goroutine https://play.golang.org/p/tZklz7iiwGd,我也删除了不必要的互斥锁正在使用通道,这应该是线程安全的。而且我还让消息一个接一个地执行,就好像我将两者都发送到同一个 goroutine 上,这将导致并发,我可以将消息发送到关闭的通道。