首页 > 解决方案 > 处理多个 websocket 连接

问题描述

我正在尝试创建一个程序,该程序将通过 gorilla web-sockets 连接到多个服务器。我目前有一个程序,它将遍历服务器地址列表并创建一个新的 goroutine,它将创建自己的 Websocket.conn 并处理读取和写入。

问题是每次创建一个新的 goroutine 时,之前的 goroutine 都会被阻塞,只有最后一个可以继续。我相信这是因为 gorilla websocket 库阻塞了每个 gorotutine,但我可能弄错了。

我尝试在服务器列表迭代器中放置一个计时器,并且每个 goroutine 都可以正常工作,但是当使用另一个地址创建一个新的 goroutine 时,前一个 goroutine 就会被阻塞。

我的代码的相关位:

在我的main.go

for _, server := range servers {
  go control(ctx, server, port)
}

control()


func control(ctx context.Context, server, port string) { 
  url := url.URL{
    Scheme: "ws",
    Host: server + ":" + port,
    Path: "",
  }
  conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  go sendHandler(ctx, conn)
  go readHandler(ctx, conn)
}

readHandler(ctx context.Context, conn *websocket.Con) {
  for {
    _, p, err := conn.ReadMessage(); if err != nil {
      panic(err)
    }
    select {
      case <-ctx.Done():
        goto TERM
      default:
        // do nothing
    }
  }
  TERM:
  // do termination  
}

sendHandler(ctx context.Context, conn *websocket.Con) {
  for _, msg := range msges {
    err = conn.WriteMessage(websocket.TextMessage, msg)
    if err != nil {
      panic(err)
    }
  }
  <-ctx.Done()
}

我删除了添加等待组和其他不必要的代码的部分。

所以我期望有 3n 个 goroutine 运行(其中 n 是服务器的数量)而不会阻塞,但现在我看到只有 3 个 goroutine 在运行,它们是服务器列表的最后一次迭代调用的那些。

谢谢!

编辑 14/06/2019:

我花了一些时间制作了一个小的工作示例,并且在示例中没有发生错误 - 没有一个线程相互阻塞。我仍然不确定是什么原因造成的,但这是我的小型工作示例:

main.go

package main

import (
    "context"
    "fmt"
    "os"
    "time"
    "os/signal"
    "syscall"
    "sync"
    "net/url"
    "github.com/gorilla/websocket"
    )

func main() {
    servers := []string{"5555","5556", "5557"}
    comms := make(chan os.Signal, 1)
    signal.Notify(comms, os.Interrupt, syscall.SIGTERM)

    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    var wg sync.WaitGroup

    for _, server := range servers {
        wg.Add(1)
        go control(server,
                   ctx,
                   &wg)
    }

    <-comms
    cancel()
    wg.Wait()
}

func control(server string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Started control for %s\n", server)
    url := url.URL {
        Scheme: "ws",
        Host: "0.0.0.0" + ":" + server,
        Path: "",
    }
    conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    var localwg sync.WaitGroup

    localwg.Add(1)
    go sendHandler(ctx, conn, &localwg, server)
    localwg.Add(1)
    go readHandler(ctx, conn, &localwg, server)

    <- ctx.Done()
    localwg.Wait()
    wg.Done()
    return
}

func sendHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
    for i := 0; i < 50; i++ {
        err := conn.WriteMessage(websocket.TextMessage, []byte("ping"))
        if err != nil {
            panic(err)
        }
        fmt.Printf("sent msg to %s\n", server)
        time.Sleep(1 * time.Second)
    }
    <- ctx.Done()
    wg.Done()
}

func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
    for {

        select {

            case <- ctx.Done():
                wg.Done()
                return
            default:
                _, p, err :=  conn.ReadMessage()
                if err != nil {
                    wg.Done()
                    fmt.Println("done")
                }
                fmt.Printf("Got [%s] from %s\n", string(p), server)
        }
    }
}

我分别在 5555、5556和 5557 上的服务器上使用 dpallot 的simple-websocket-server对其进行了测试。

标签: gowebsocketgorilla

解决方案


这部分代码导致了问题:

conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
    panic(err)
}
defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)

您创建连接,延迟关闭,启动另外两个 goroutine,然后结束函数。由于您的延迟,函数 end 关闭了套接字。


推荐阅读