首页 > 解决方案 > 为什么客户端和服务器会导致数据竞争?

问题描述

我正在尝试使用通道作为基本 PubSub 创建 TCP 中继。我的目标是将一个 TCP 流中继到多个客户端(一对多)。我无法修复客户端和服务器连接之间的数据竞争。我将不胜感激有关为什么客户端和服务器连接之间发生数据竞争的任何见解?

我认为pubsub部分还可以。它改编自以下博客:

https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/

根据数据竞争警告,竞争发生在下面的主功能代码块中。我对导致数据竞争的行发表了一些评论。我以为可以同时运行服务器和客户端,我弄错了吗?

package main

import (
    "flag"
    "net"
    "os"
    "sync"
)

var (
    laddr = flag.String("l", "", "listen address (:port)")
    raddr = flag.String("r", "", "remote address (host:port)")
)

type Sub struct {
    topic string
    id    int64
}

type Pubsub struct {
    mu      sync.RWMutex
    subs    map[Sub]chan []byte
    closed  bool
    counter int64
}

func NewPubsub() *Pubsub {
    ps := &Pubsub{}
    ps.subs = make(map[Sub]chan []byte)
    ps.closed = false
    return ps
}

func (ps *Pubsub) Close() {
    ps.mu.Lock()
    defer ps.mu.Unlock()

    if !ps.closed {
        ps.closed = true
        for _, sub := range ps.subs {
            close(sub)
        }
    }
}

func (ps *Pubsub) Subscribe(topic string) (<-chan []byte, Sub) {
    ps.mu.Lock()
    defer ps.mu.Unlock()

    // initialze the subscription
    sub := Sub{topic: topic, id: ps.counter}

    // Add the subscription to the map
    ch := make(chan []byte, 1)
    ps.subs[sub] = ch

    // Increment the counter
    ps.counter++

    return ch, sub
}

func (ps *Pubsub) Unsubscribe(s Sub) {
    ps.mu.Lock()
    defer ps.mu.Unlock()
    delete(ps.subs, s)
}

func (ps *Pubsub) Publish(topic string, msg []byte) {
    ps.mu.RLock()
    defer ps.mu.RUnlock()

    for sub, ch := range ps.subs {
        if sub.topic == topic {
            ch <- msg
        }
    }
}

func main() {
    flag.Parse()
    if *laddr == "" || *raddr == "" {
        flag.PrintDefaults()
        os.Exit(1)
    }

    ps := NewPubsub()

    publisher := func(topic string) {
        remote, err := net.Dial("tcp", *raddr)
        if err != nil {
            return
        }
        buf := make([]byte, 2048)
        for {
            n, _ := remote.Read(buf)  // *** RACE HERE ***
            ps.Publish(topic, buf[:n])
        }
    }

    go publisher("relay")

    subscriber := func(conn net.Conn, ch <-chan []byte) {
        for i := range ch {
            conn.Write([]byte(i))  // *** RACE HERE ***
        }
    }

    ln, err := net.Listen("tcp", *laddr)
    if err != nil {
        return
    }

    for {
        conn, err := ln.Accept()
        if err != nil {
            continue
        }
        ch, _ := ps.Subscribe("relay")
        go subscriber(conn, ch)

    }

}

使用“go run -race pubsub.go”时的数据竞争输出如下所示。

在第一个客户端连接到侦听服务器的端口之前,不会出现数据争用警告。

在这个程序运行时,我没有看到任何其他类型的数据竞争。但是当我中继二进制数据时,字节很少被损坏或丢失,这表明它们可能是我幼稚实现的其他问题。

==================
WARNING: DATA RACE
Write at 0x00c0000f8000 by goroutine 7:
  internal/race.WriteRange()
      /usr/local/go/src/internal/race/race.go:49 +0xaa
  syscall.Read()
      /usr/local/go/src/syscall/syscall_unix.go:190 +0x89
  internal/poll.ignoringEINTRIO()
      /usr/local/go/src/internal/poll/fd_unix.go:581 +0x1c8
  internal/poll.(*FD).Read()
      /usr/local/go/src/internal/poll/fd_unix.go:162 +0x17c
  net.(*netFD).Read()
      /usr/local/go/src/net/fd_posix.go:55 +0x68
  net.(*conn).Read()
      /usr/local/go/src/net/net.go:183 +0xeb
  net.(*TCPConn).Read()
      <autogenerated>:1 +0x69
  main.main.func1()
      /pubsub/pubsub.go:101 +0x154

Previous read at 0x00c0000f8000 by goroutine 9:
  internal/race.ReadRange()
      /usr/local/go/src/internal/race/race.go:45 +0xb0
  syscall.Write()
      /usr/local/go/src/syscall/syscall_unix.go:215 +0x94
  internal/poll.ignoringEINTRIO()
      /usr/local/go/src/internal/poll/fd_unix.go:581 +0x16e
  internal/poll.(*FD).Write()
      /usr/local/go/src/internal/poll/fd_unix.go:274 +0x294
  net.(*netFD).Write()
      /usr/local/go/src/net/fd_posix.go:73 +0x68
  net.(*conn).Write()
      /usr/local/go/src/net/net.go:195 +0xeb
  net.(*TCPConn).Write()
      <autogenerated>:1 +0x69
  main.main.func2()
      /pubsub/pubsub.go:110 +0x84

Goroutine 7 (running) created at:
  main.main()
      /pubsub/pubsub.go:106 +0x288

Goroutine 9 (running) created at:
  main.main()
      /pubsub/pubsub.go:125 +0x38f
==================

标签: go

解决方案


快速解决:

    // buf := make([]byte, 2048) // <- move this ...
    for {
        buf := make([]byte, 2048) // <- ... to here
        n, _ := remote.Read(buf)
        ps.Publish(topic, buf[:n])
    }

为什么这个坏了?由于单个(常量)buf通过通道传递给多个订阅者(读者) - 当下一次for迭代发生时 - 这些读者将获得损坏的 racy 数据。

每次迭代创建一个唯一的缓冲区将确保没有新的写入会破坏已发送且仍在由订阅者处理的旧消息。


推荐阅读