首页 > 解决方案 > sync.Waitgroup 不被尊重

问题描述

我注意到许多 goroutines 仍在运行,即使程序应该等待它们全部完成。我的理解是添加一个等待组可以解决这个问题,但它没有。

func RunIntradayScanner() {
    // waitgroup for channels
    var wg sync.WaitGroup

    logrus.Info("Clearing out pattern slices...")
    var tf5 []request.StratNotification
    var tf15 []request.StratNotification
    var tf30 []request.StratNotification
    var tf60 []request.StratNotification

    // make the channel for comms to functions
    var intradayChannel = make(chan request.StratNotification)

    // range through DB table
    symbols := sources.GetSymbols()
    wg.Add(len(symbols))

    go func() {
        logrus.Info("------Waiting for workers to finish")
        wg.Wait()
        logrus.Info("------Closing intraday channel")
        close(intradayChannel)
    }()

    for _, s := range symbols {
        // wg.Add(1)
        go IntradayStratify(strings.TrimSpace(s.Symbol), intradayChannel, &wg)
        match := <-intradayChannel

        switch match.TimeFrame {
        case 5:
            tf5 = append(tf5, match)
        case 15:
            tf15 = append(tf15, match)
        case 30:
            tf30 = append(tf30, match)
        case 60:
            tf60 = append(tf60, match)
        default:
        }
    }

if len(tf5) > 0 {
        SplitUpAndSendEmbedToDiscord(5, tf5)
    }

    if len(tf15) > 0 {
        SplitUpAndSendEmbedToDiscord(15, tf15)
    }

    if len(tf30) > 0 {
        SplitUpAndSendEmbedToDiscord(30, tf30)
    }

    if len(tf60) > 0 {
        SplitUpAndSendEmbedToDiscord(60, tf60)
    }
}

// IntradayStratify - go routine to run during market hours
func IntradayStratify(ticker string, c chan request.StratNotification, wg *sync.WaitGroup) {
    defer wg.Done()

    candles := request.GetIntraday(ticker)
    for _, tf := range timeframes {
        chunkedCandles := request.DetermineTimeframes(tf, ticker, candles)
        if len(chunkedCandles) > 1 {
            highLows := request.CalculateIntraDayHighLow(chunkedCandles)
            // logrus.Infof("%s Highlows calculated: %d", ticker, len(highLows))
            // Should have more than 2 candles to start detecting patterns now
            if len(highLows) > 2 {
                bl, stratPattern := request.DetermineStratPattern(ticker, tf, highLows)
                if bl {
                    c <- stratPattern
                }
            }
        }

        // otherwise return an empty channel
        c <- request.StratNotification{}
    }

}

func main() {
  RunIntradayScanner()
}

for我期望程序在循环遍历符号后再次变成单线程。相反,stdout 如下所示,看起来 goroutines 仍在返回。结果应该是每行写着“ Pattern XX found for timeframe ”也会有一个相应的“ Sending to discord ”输出行。

...
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for CRM"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="SNAP Pattern 3-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for EBAY"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for MRVL"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Sending to discord: \n**SPY** :green_circle: $467.16  :red_circle: $466.92"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for CVS"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for QCOM"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for ZM"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="X Pattern 3-2D found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="SQ Pattern 2D-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="MSFT Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="NVDA Pattern 2D-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="PTON Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="MARA Pattern 2U-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="COIN Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="ROKU Pattern 1-2D found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="SHOP Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="PFE Pattern 3-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="RBLX Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="AFRM Pattern 2D-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Sending to discord: \n**SNAP** :green_circle: $54.71  :red_circle: $54.59"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Done with Intraday scanner"

标签: go

解决方案


每次开始执行 goroutine 后,原始代码都会阻塞,等待通过非缓冲通道发送一个值,此外,当WaitGroup倒计时时通道关闭,这也关闭了接收端的通道。

恕我直言,一般规则是:

不要从接收方关闭通道,如果通道有多个并发发送方,也不要关闭通道。

package main

import (
    "fmt"
    "strings"
)

type StratNotification struct {
    Symbol string
}

func GetSymbols() []StratNotification {
    return []StratNotification{
        {Symbol: "a"},
        {Symbol: "b"},
        {Symbol: "c"},
        {Symbol: "d"},
    }
}

func RunIntradayScanner() {
    symbols := GetSymbols()
    var intradayChannel = make(chan StratNotification)
    for _, s := range symbols {
        go IntradayStratify(strings.TrimSpace(s.Symbol), intradayChannel)
    }

    for _ = range symbols {
        s := <-intradayChannel
        fmt.Println(s)
    }
}

func IntradayStratify(ticker string, c chan StratNotification) {
    // do some heavy lifting
    fmt.Println(ticker)
    c <- StratNotification{}
}

func main() {
    RunIntradayScanner()
}

推荐阅读