go - 为什么客户端和服务器会导致数据竞争?
问题描述
我正在尝试使用通道作为基本 PubSub 创建 TCP 中继。我的目标是将一个 TCP 流中继到多个客户端(一对多)。我无法修复客户端和服务器连接之间的数据竞争。我将不胜感激有关为什么客户端和服务器连接之间发生数据竞争的任何见解?
我认为pubsub部分还可以。它改编自以下博客:
https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/
根据数据竞争警告,竞争发生在下面的主功能代码块中。我对导致数据竞争的行发表了一些评论。我以为可以同时运行服务器和客户端,我弄错了吗?
package main
import (
"flag"
"net"
"os"
"sync"
)
var (
laddr = flag.String("l", "", "listen address (:port)")
raddr = flag.String("r", "", "remote address (host:port)")
)
type Sub struct {
topic string
id int64
}
type Pubsub struct {
mu sync.RWMutex
subs map[Sub]chan []byte
closed bool
counter int64
}
func NewPubsub() *Pubsub {
ps := &Pubsub{}
ps.subs = make(map[Sub]chan []byte)
ps.closed = false
return ps
}
func (ps *Pubsub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.closed {
ps.closed = true
for _, sub := range ps.subs {
close(sub)
}
}
}
func (ps *Pubsub) Subscribe(topic string) (<-chan []byte, Sub) {
ps.mu.Lock()
defer ps.mu.Unlock()
// initialze the subscription
sub := Sub{topic: topic, id: ps.counter}
// Add the subscription to the map
ch := make(chan []byte, 1)
ps.subs[sub] = ch
// Increment the counter
ps.counter++
return ch, sub
}
func (ps *Pubsub) Unsubscribe(s Sub) {
ps.mu.Lock()
defer ps.mu.Unlock()
delete(ps.subs, s)
}
func (ps *Pubsub) Publish(topic string, msg []byte) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for sub, ch := range ps.subs {
if sub.topic == topic {
ch <- msg
}
}
}
func main() {
flag.Parse()
if *laddr == "" || *raddr == "" {
flag.PrintDefaults()
os.Exit(1)
}
ps := NewPubsub()
publisher := func(topic string) {
remote, err := net.Dial("tcp", *raddr)
if err != nil {
return
}
buf := make([]byte, 2048)
for {
n, _ := remote.Read(buf) // *** RACE HERE ***
ps.Publish(topic, buf[:n])
}
}
go publisher("relay")
subscriber := func(conn net.Conn, ch <-chan []byte) {
for i := range ch {
conn.Write([]byte(i)) // *** RACE HERE ***
}
}
ln, err := net.Listen("tcp", *laddr)
if err != nil {
return
}
for {
conn, err := ln.Accept()
if err != nil {
continue
}
ch, _ := ps.Subscribe("relay")
go subscriber(conn, ch)
}
}
使用“go run -race pubsub.go”时的数据竞争输出如下所示。
在第一个客户端连接到侦听服务器的端口之前,不会出现数据争用警告。
在这个程序运行时,我没有看到任何其他类型的数据竞争。但是当我中继二进制数据时,字节很少被损坏或丢失,这表明它们可能是我幼稚实现的其他问题。
==================
WARNING: DATA RACE
Write at 0x00c0000f8000 by goroutine 7:
internal/race.WriteRange()
/usr/local/go/src/internal/race/race.go:49 +0xaa
syscall.Read()
/usr/local/go/src/syscall/syscall_unix.go:190 +0x89
internal/poll.ignoringEINTRIO()
/usr/local/go/src/internal/poll/fd_unix.go:581 +0x1c8
internal/poll.(*FD).Read()
/usr/local/go/src/internal/poll/fd_unix.go:162 +0x17c
net.(*netFD).Read()
/usr/local/go/src/net/fd_posix.go:55 +0x68
net.(*conn).Read()
/usr/local/go/src/net/net.go:183 +0xeb
net.(*TCPConn).Read()
<autogenerated>:1 +0x69
main.main.func1()
/pubsub/pubsub.go:101 +0x154
Previous read at 0x00c0000f8000 by goroutine 9:
internal/race.ReadRange()
/usr/local/go/src/internal/race/race.go:45 +0xb0
syscall.Write()
/usr/local/go/src/syscall/syscall_unix.go:215 +0x94
internal/poll.ignoringEINTRIO()
/usr/local/go/src/internal/poll/fd_unix.go:581 +0x16e
internal/poll.(*FD).Write()
/usr/local/go/src/internal/poll/fd_unix.go:274 +0x294
net.(*netFD).Write()
/usr/local/go/src/net/fd_posix.go:73 +0x68
net.(*conn).Write()
/usr/local/go/src/net/net.go:195 +0xeb
net.(*TCPConn).Write()
<autogenerated>:1 +0x69
main.main.func2()
/pubsub/pubsub.go:110 +0x84
Goroutine 7 (running) created at:
main.main()
/pubsub/pubsub.go:106 +0x288
Goroutine 9 (running) created at:
main.main()
/pubsub/pubsub.go:125 +0x38f
==================
解决方案
快速解决:
// buf := make([]byte, 2048) // <- move this ...
for {
buf := make([]byte, 2048) // <- ... to here
n, _ := remote.Read(buf)
ps.Publish(topic, buf[:n])
}
为什么这个坏了?由于单个(常量)buf
通过通道传递给多个订阅者(读者) - 当下一次for
迭代发生时 - 这些读者将获得损坏的 racy 数据。
每次迭代创建一个唯一的缓冲区将确保没有新的写入会破坏已发送且仍在由订阅者处理的旧消息。
推荐阅读
- escaping - 在 Blazor 中转义 @
- javascript - 获取可观察的笔记本元数据
- jvisualvm - 如何通过 Java VisualVM 监控 thorntail 资源
- javascript - Problems while using partial views in Express js
- python - 使用变量时的 Pandas 日期时间列过滤问题
- reactjs - 描述到打字稿反应组件包装器
- python - 将 Elif 语句更改为“类 Python”代码
- windows - 在 azure vm (windows server 2016) 上配置 MSMQ 群集故障转移
- terraform - 获取现有工作区失败:查询云存储失败:存储:存储桶不存在
- javascript - 将 NodeJS 应用程序连接到 SignalR(使用 .NET Core 3)