首页 > 解决方案 > 无法连接时停止频道

问题描述

我有以下代码可以正常工作,问题是当连接socket.Connect() 失败时我想停止进程,我尝试使用以下代码但它不起作用,即如果套接字连接失败,程序仍然运行. 我想要发生的是,如果connect失败,过程停止并且通道......我在这里错过了什么?

func run (appName string) (err error) {


        done = make(chan bool)
        defer close(done)


        serviceURL, e := GetContext().getServiceURL(appName)

        if e != nil {
            err = errors.New("process failed" + err.Error())
            LogDebug("Exiting %v func[err =%v]", methodName, err)
            return err
        }

        url := "wss://" + serviceURL + route


        socket := gowebsocket.New(url)
        addPass(&socket, user, pass)


        socket.OnConnectError = OnConnectErrorHandler
        socket.OnConnected = OnConnectedHandler
        socket.OnTextMessage = socketTextMessageHandler
        socket.OnDisconnected = OnDisconnectedHandler

        LogDebug("In %v func connecting to URL  %v", methodName, url)
        socket.Connect()

        jsonBytes, e := json.Marshal(payload)
        if e != nil {
            err = errors.New("build process failed" + e.Error())
            LogDebug("Exiting %v func[err =%v]", methodName, err)
            return err
        }

        jsonStr := string(jsonBytes)

        LogDebug("In %v Connecting to payload JSON is  %v", methodName, jsonStr)
        socket.SendText(jsonStr)

        <-done
        LogDebug("Exiting %v func[err =%v]", methodName, err)
        return err

    }


    func OnConnectErrorHandler(err error, socket gowebsocket.Socket) {
        methodName := "OnConnectErrorHandler"
        LogDebug("Starting %v parameters [err = %v , socket = %v]", methodName, err, socket)
        LogInfo("Disconnected from server ")
        done <- true
    }

该进程应该为运行大约60-90sec 的进程打开一个 ws 连接(例如执行 npm install),并通过web socket 获取进程的日志以及何时完成,当然还要处理可能发生的问题,例如网络问题或运行时出现的一些错误过程

标签: multithreadinggowebsocketchannel

解决方案


所以,@Slabgorb 是正确的——如果你看这里(https://github.com/sacOO7/GoWebsocket/blob/master/gowebsocket.go#L87),你会看到OnConnectErrorHandler在你调用Connect(). 在连接完全建立并且回调完成之前,该Connect()函数不会启动单独的 goroutine 来处理 websocket 。OnConnected因此,当您尝试写入无缓冲的通道done时,您将阻塞与调用该函数的同一个goroutine run(),并且您自己会死锁,因为没有 goroutine 能够从通道中读取来解除阻塞。

因此,您可以采用他的解决方案并将其转换为缓冲通道,这将起作用,但我的建议是不要为这种一次性标志行为写入通道,而是使用close信号。为您要终止的每个条件定义一个通道run(),并在相应的 websocket 处理程序函数中定义close该条件发生时的通道。在底部run(),您可以select打开所有频道,并在第一个频道关闭时退出。它看起来像这样:

package main

import "errors"

func run(appName string) (err error) {

    // first, define one channel per socket-closing-reason (DO NOT defer close these channels.)
    connectErrorChan := make(chan struct{})
    successDoneChan := make(chan struct{})
    surpriseDisconnectChan := make(chan struct{})

    // next, wrap calls to your handlers in a closure `https://gobyexample.com/closures`
    // that captures a reference to the channel you care about
    OnConnectErrorHandler := func(err error, socket gowebsocket.Socket) {
        MyOnConnectErrorHandler(connectErrorChan, err, socket)
    }
    OnDisconnectedHandler := func(err error, socket gowebsocket.Socket) {
        MyOnDisconectedHandler(surpriseDisconnectChan, err, socket)
    }
    // ... declare any other handlers that might close the connection here

    // Do your setup logic here
    // serviceURL, e := GetContext().getServiceURL(appName)
    // . . .
    // socket := gowebsocket.New(url)

    socket.OnConnectError = OnConnectErrorHandler
    socket.OnConnected = OnConnectedHandler
    socket.OnTextMessage = socketTextMessageHandler
    socket.OnDisconnected = OnDisconnectedHandler

    // Prepare and send your message here...
    // LogDebug("In %v func connecting to URL  %v", methodName, url)
    // . . .
    // socket.SendText(jsonStr)

    // now wait for one of your signalling channels to close.
    select { // this will block until one of the handlers signals an exit
    case <-connectError:
        err = errors.New("never connected  :( ")
    case <-successDone:
        socket.Close()
        LogDebug("mission accomplished! :) ")
    case <-surpriseDisconnect:
        err = errors.New("somebody cut the wires!  :O ")
    }

    if err != nil {
        LogDebug(err)
    }
    return err
}

// *Your* connect error handler will take an extra channel as a parameter
func MyOnConnectErrorHandler(done chan struct{}, err error, socket gowebsocket.Socket) {
    methodName := "OnConnectErrorHandler"
    LogDebug("Starting %v parameters [err = %v , socket = %v]", methodName, err, socket)
    LogInfo("Disconnected from server ")
    close(done) // signal we are done.
}

这有几个优点:

1)您无需猜测哪些回调发生在进程中,哪些发生在后台 goroutines 中(并且您不必让所有通道缓冲“以防万一”)

2) 在多个通道上进行选择可以让您找出退出的原因,并可能以不同的方式处理清理或记录。

注意 1:如果您选择使用close信号,则必须为每个源使用不同的通道,以避免可能导致通道从不同的 goroutine 关闭两次的竞争条件(例如,当您返回响应时会发生超时,并且两个处理程序都会触发;关闭同一通道的第二个处理程序会导致panic.) 这也是为什么您不希望将defer close所有通道都放在函数顶部的原因。

注意2:与您的问题没有直接关系,但是 - 您不需要关闭每个通道 - 一旦它的所有句柄超出范围,无论它是否已关闭,通道都会被垃圾收集。


推荐阅读