multithreading - 无法连接时停止频道
问题描述
我有以下代码可以正常工作,问题是当连接socket.Connect()
失败时我想停止进程,我尝试使用以下代码但它不起作用,即如果套接字连接失败,程序仍然运行. 我想要发生的是,如果connect
失败,过程停止并且通道......我在这里错过了什么?
func run (appName string) (err error) {
done = make(chan bool)
defer close(done)
serviceURL, e := GetContext().getServiceURL(appName)
if e != nil {
err = errors.New("process failed" + err.Error())
LogDebug("Exiting %v func[err =%v]", methodName, err)
return err
}
url := "wss://" + serviceURL + route
socket := gowebsocket.New(url)
addPass(&socket, user, pass)
socket.OnConnectError = OnConnectErrorHandler
socket.OnConnected = OnConnectedHandler
socket.OnTextMessage = socketTextMessageHandler
socket.OnDisconnected = OnDisconnectedHandler
LogDebug("In %v func connecting to URL %v", methodName, url)
socket.Connect()
jsonBytes, e := json.Marshal(payload)
if e != nil {
err = errors.New("build process failed" + e.Error())
LogDebug("Exiting %v func[err =%v]", methodName, err)
return err
}
jsonStr := string(jsonBytes)
LogDebug("In %v Connecting to payload JSON is %v", methodName, jsonStr)
socket.SendText(jsonStr)
<-done
LogDebug("Exiting %v func[err =%v]", methodName, err)
return err
}
func OnConnectErrorHandler(err error, socket gowebsocket.Socket) {
methodName := "OnConnectErrorHandler"
LogDebug("Starting %v parameters [err = %v , socket = %v]", methodName, err, socket)
LogInfo("Disconnected from server ")
done <- true
}
该进程应该为运行大约60-90
sec 的进程打开一个 ws 连接(例如执行 npm install),并通过web socke
t 获取进程的日志以及何时完成,当然还要处理可能发生的问题,例如网络问题或运行时出现的一些错误过程
解决方案
所以,@Slabgorb 是正确的——如果你看这里(https://github.com/sacOO7/GoWebsocket/blob/master/gowebsocket.go#L87),你会看到OnConnectErrorHandler
在你调用Connect()
. 在连接完全建立并且回调完成之前,该Connect()
函数不会启动单独的 goroutine 来处理 websocket 。OnConnected
因此,当您尝试写入无缓冲的通道done
时,您将阻塞与调用该函数的同一个goroutine run()
,并且您自己会死锁,因为没有 goroutine 能够从通道中读取来解除阻塞。
因此,您可以采用他的解决方案并将其转换为缓冲通道,这将起作用,但我的建议是不要为这种一次性标志行为写入通道,而是使用close
信号。为您要终止的每个条件定义一个通道run()
,并在相应的 websocket 处理程序函数中定义close
该条件发生时的通道。在底部run()
,您可以select
打开所有频道,并在第一个频道关闭时退出。它看起来像这样:
package main
import "errors"
func run(appName string) (err error) {
// first, define one channel per socket-closing-reason (DO NOT defer close these channels.)
connectErrorChan := make(chan struct{})
successDoneChan := make(chan struct{})
surpriseDisconnectChan := make(chan struct{})
// next, wrap calls to your handlers in a closure `https://gobyexample.com/closures`
// that captures a reference to the channel you care about
OnConnectErrorHandler := func(err error, socket gowebsocket.Socket) {
MyOnConnectErrorHandler(connectErrorChan, err, socket)
}
OnDisconnectedHandler := func(err error, socket gowebsocket.Socket) {
MyOnDisconectedHandler(surpriseDisconnectChan, err, socket)
}
// ... declare any other handlers that might close the connection here
// Do your setup logic here
// serviceURL, e := GetContext().getServiceURL(appName)
// . . .
// socket := gowebsocket.New(url)
socket.OnConnectError = OnConnectErrorHandler
socket.OnConnected = OnConnectedHandler
socket.OnTextMessage = socketTextMessageHandler
socket.OnDisconnected = OnDisconnectedHandler
// Prepare and send your message here...
// LogDebug("In %v func connecting to URL %v", methodName, url)
// . . .
// socket.SendText(jsonStr)
// now wait for one of your signalling channels to close.
select { // this will block until one of the handlers signals an exit
case <-connectError:
err = errors.New("never connected :( ")
case <-successDone:
socket.Close()
LogDebug("mission accomplished! :) ")
case <-surpriseDisconnect:
err = errors.New("somebody cut the wires! :O ")
}
if err != nil {
LogDebug(err)
}
return err
}
// *Your* connect error handler will take an extra channel as a parameter
func MyOnConnectErrorHandler(done chan struct{}, err error, socket gowebsocket.Socket) {
methodName := "OnConnectErrorHandler"
LogDebug("Starting %v parameters [err = %v , socket = %v]", methodName, err, socket)
LogInfo("Disconnected from server ")
close(done) // signal we are done.
}
这有几个优点:
1)您无需猜测哪些回调发生在进程中,哪些发生在后台 goroutines 中(并且您不必让所有通道缓冲“以防万一”)
2) 在多个通道上进行选择可以让您找出退出的原因,并可能以不同的方式处理清理或记录。
注意 1:如果您选择使用close
信号,则必须为每个源使用不同的通道,以避免可能导致通道从不同的 goroutine 关闭两次的竞争条件(例如,当您返回响应时会发生超时,并且两个处理程序都会触发;关闭同一通道的第二个处理程序会导致panic
.) 这也是为什么您不希望将defer close
所有通道都放在函数顶部的原因。
注意2:与您的问题没有直接关系,但是 - 您不需要关闭每个通道 - 一旦它的所有句柄超出范围,无论它是否已关闭,通道都会被垃圾收集。
推荐阅读
- mysql - 合并两个对 customer_id 使用 Auto Increment 值的相同 MySQL 数据库表
- javascript - '不能在准备好的语句中插入多个命令' postgreSQL REST API
- ibm-watson - 在处理程序中调用 webhook
- kubernetes - 使用 kube-prometheus-stack 配置 AlertManager 和 Prometheus ServiceMonitors 的正确方法
- node.js - Heroku 部署无法使用 Nodejs“找不到模块”
- php - 如何将 ID 更改为 url slug
- oauth-2.0 - 在访问令牌中包含其他声明
- ios - iOS AVVideoCompositionCoreAnimationTool:如何使用 videoCompositionCoreAnimationToolWithAdditionalLayer
- salesforce - 无法使用 jsforce 将项目添加到 Salesforce 中的自定义选项列表
- selenium-webdriver - ag-Grid <> Selenium:获取网格数据