go - 处理多个 websocket 连接
问题描述
我正在尝试创建一个程序,该程序将通过 gorilla web-sockets 连接到多个服务器。我目前有一个程序,它将遍历服务器地址列表并创建一个新的 goroutine,它将创建自己的 Websocket.conn 并处理读取和写入。
问题是每次创建一个新的 goroutine 时,之前的 goroutine 都会被阻塞,只有最后一个可以继续。我相信这是因为 gorilla websocket 库阻塞了每个 gorotutine,但我可能弄错了。
我尝试在服务器列表迭代器中放置一个计时器,并且每个 goroutine 都可以正常工作,但是当使用另一个地址创建一个新的 goroutine 时,前一个 goroutine 就会被阻塞。
我的代码的相关位:
在我的main.go
for _, server := range servers {
go control(ctx, server, port)
}
在control()
func control(ctx context.Context, server, port string) {
url := url.URL{
Scheme: "ws",
Host: server + ":" + port,
Path: "",
}
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)
}
readHandler(ctx context.Context, conn *websocket.Con) {
for {
_, p, err := conn.ReadMessage(); if err != nil {
panic(err)
}
select {
case <-ctx.Done():
goto TERM
default:
// do nothing
}
}
TERM:
// do termination
}
sendHandler(ctx context.Context, conn *websocket.Con) {
for _, msg := range msges {
err = conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
panic(err)
}
}
<-ctx.Done()
}
我删除了添加等待组和其他不必要的代码的部分。
所以我期望有 3n 个 goroutine 运行(其中 n 是服务器的数量)而不会阻塞,但现在我看到只有 3 个 goroutine 在运行,它们是服务器列表的最后一次迭代调用的那些。
谢谢!
编辑 14/06/2019:
我花了一些时间制作了一个小的工作示例,并且在示例中没有发生错误 - 没有一个线程相互阻塞。我仍然不确定是什么原因造成的,但这是我的小型工作示例:
main.go
package main
import (
"context"
"fmt"
"os"
"time"
"os/signal"
"syscall"
"sync"
"net/url"
"github.com/gorilla/websocket"
)
func main() {
servers := []string{"5555","5556", "5557"}
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for _, server := range servers {
wg.Add(1)
go control(server,
ctx,
&wg)
}
<-comms
cancel()
wg.Wait()
}
func control(server string, ctx context.Context, wg *sync.WaitGroup) {
fmt.Printf("Started control for %s\n", server)
url := url.URL {
Scheme: "ws",
Host: "0.0.0.0" + ":" + server,
Path: "",
}
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
defer conn.Close()
var localwg sync.WaitGroup
localwg.Add(1)
go sendHandler(ctx, conn, &localwg, server)
localwg.Add(1)
go readHandler(ctx, conn, &localwg, server)
<- ctx.Done()
localwg.Wait()
wg.Done()
return
}
func sendHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for i := 0; i < 50; i++ {
err := conn.WriteMessage(websocket.TextMessage, []byte("ping"))
if err != nil {
panic(err)
}
fmt.Printf("sent msg to %s\n", server)
time.Sleep(1 * time.Second)
}
<- ctx.Done()
wg.Done()
}
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for {
select {
case <- ctx.Done():
wg.Done()
return
default:
_, p, err := conn.ReadMessage()
if err != nil {
wg.Done()
fmt.Println("done")
}
fmt.Printf("Got [%s] from %s\n", string(p), server)
}
}
}
我分别在 5555、5556和 5557 上的服务器上使用 dpallot 的simple-websocket-server对其进行了测试。
解决方案
这部分代码导致了问题:
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)
您创建连接,延迟关闭,启动另外两个 goroutine,然后结束函数。由于您的延迟,函数 end 关闭了套接字。
推荐阅读
- python - 为什么 Visual Studio 代码在安装后找不到 pandas
- r - 将 xyz 数据帧转换为基础 R 中的矩阵
- c# - 如何将 int 值添加到 ac# 数组
- html - 如何在 django 项目中使用下载的字体而不是 google fonts api
- data-science - 数据验证/验证、数据探索/利用、数据采集/收集/收集之间的区别?
- angular - 为什么使用primeng p-radiobutton会导致Angular 9中的本机元素错误?
- python - Python 按钮算法
- javascript - 在带有辅助函数的 ejs 中使用 .then 返回 [ object promise ]
- python - 如何在神经网络训练的损失函数中执行快速傅立叶变换
- php - mysqli 插入 php 变量未按预期工作