首页 > 解决方案 > 如何处理共享同一通道的多个 goroutine

问题描述

我一直在寻找很多,但还没有找到我的问题的答案。

我需要多次调用外部 API,但同时使用不同的参数。然后对于每个调用,我需要为每个数据集初始化一个结构并处理我从 API 调用接收到的数据。请记住,我阅读了传入请求的每一行并立即开始将其发送到通道。

由于我接收的数据量很大,我遇到的第一个问题一开始并不明显,是每个 goroutine 没有接收到通过通道的所有数据。(我通过我所做的研究了解到)。所以我需要一种将数据重新排队/重定向到正确的 goroutine 的方法。

从单个数据集发送流式响应的函数。(我已经删除了脱离上下文的无用代码部分)

func (api *API) RequestData(ctx context.Context, c chan DWeatherResponse, dataset string, wg *sync.WaitGroup) error {
 for {
        line, err := reader.ReadBytes('\n')
        s := string(line)
        if err != nil {
            log.Println("End of %s", dataset)
            return err
        }
    
        data, err := extractDataFromStreamLine(s, dataset)
        if err != nil {
            continue
        }

        c <- *data
    }
}

处理传入数据的函数

func (s *StrikeStruct) Process(ch, requeue chan dweather.DWeatherResponse) {
    for {
        data, more := <-ch
        if !more {
            break
        }

       // data contains {dataset string, value float64, date time.Time}
      // The s.Parameter needs to match the dataset
        
         // IMPORTANT PART, checks if the received data is part of this struct dataset
          // If not I want to send it to another go routine until it gets to the correct 
          one. There will be a max of 4 datasets but still this could not be the best approach to have 
        if !api.GetDataset(s.Parameter, data.Dataset) {
            requeue <- data
            continue
        }
        // Do stuff with the data from this point
    }
}

现在在我自己的 API 端点上,我有以下内容:

ch := make(chan dweather.DWeatherResponse, 2)
requeue := make(chan dweather.DWeatherResponse)
final := make(chan strike.StrikePerYearResponse)

    var wg sync.WaitGroup


    for _, s := range args.Parameters.Strikes {
        strike := strike.StrikePerYear{
            Parameter:       strike.Parameter(s.Dataset),
            StrikeValue: s.Value,
        }

        // I receive and process the data in here
        go strike.ProcessStrikePerYear(ch, requeue, final, string(s.Dataset))
    }


    go func() {
        for {
            data, _ := <-requeue
            ch <- data
        }
    }()

   // Creates a goroutine for each dataset
    for _, dataset := range api.Params.Dataset {
        wg.Add(1)
        go api.RequestData(ctx, ch, dataset, &wg)
    }

    wg.Wait()
    close(ch)

    //Once the data is all processed it is all appended
    var strikes []strike.StrikePerYearResponse
    for range args.Fetch.Datasets {
        strikes = append(strikes, <-final)
    }

 return strikes

这段代码的问题是,一旦我开始从多个端点接收数据,requeue就会阻塞,不会再发生任何事情。如果我删除该requeue逻辑数据,如果它没有落在正确的 goroutine 上,它将丢失。

我的两个问题是:

  1. 如果它有一个总是准备好接收的 goroutine,为什么 requeue 会阻塞?
  2. 我应该对如何处理传入数据采取不同的方法吗?

标签: goconcurrencychannel

解决方案


这不是解决问题的好方法。你应该改变你的解决方案。我建议如下实现:

import (
"fmt"
"sync"
)

// answer for https://stackoverflow.com/questions/68454226/how-to-handle-multiple-goroutines-that-share-the-same-channel

var (
    finalResult = make(chan string)
)

// IData use for message dispatcher that all struct must implement its method
type IData interface {
    IsThisForMe() bool
    Process(*sync.WaitGroup)
}

//MainData can be your main struct like StrikePerYear
type MainData struct {
    // add any props
    Id   int
    Name string
}

type DataTyp1 struct {
    MainData *MainData
}

func (d DataTyp1) IsThisForMe() bool {
    // you can check your condition here to checking incoming data
    if d.MainData.Id == 2 {
        return true
    }
    return false
}

func (d DataTyp1) Process(wg *sync.WaitGroup) {
    d.MainData.Name = "processed by DataTyp1"
    // send result to final channel, you can change it as you want
    finalResult <- d.MainData.Name
    wg.Done()
}

type DataTyp2 struct {
    MainData *MainData
}

func (d DataTyp2) IsThisForMe() bool {
    // you can check your condition here to checking incoming data
    if d.MainData.Id == 3 {
         return true
    }
    return false
}

func (d DataTyp2) Process(wg *sync.WaitGroup) {
     d.MainData.Name = "processed by DataTyp2"
    // send result to final channel, you can change it as you want
    finalResult <- d.MainData.Name
    wg.Done()
}

//dispatcher will run new go routine for each request.
//you can implement a worker pool to preventing running too many go routines.
func dispatcher(incomingData *MainData, wg *sync.WaitGroup) {
     // based on your requirements you can remove this go routing or not
    go func() {
        var p IData
        p = DataTyp1{incomingData}
        if p.IsThisForMe() {
            go p.Process(wg)
            return
        }
        p = DataTyp2{incomingData}
        if p.IsThisForMe() {
            go p.Process(wg)
            return
        }
    }()
}
func main() {
    dummyDataArray := []MainData{
        MainData{Id: 2, Name: "this data #2"},
        MainData{Id: 3, Name: "this data #3"},
    }
    wg := sync.WaitGroup{}
    for i := range dummyDataArray {
        wg.Add(1)
        dispatcher(&dummyDataArray[i], &wg)
    }
    result := make([]string, 0)
    done := make(chan struct{})
    // data collector
    go func() {
        loop:for {
            select {
            case <-done:
                break loop
            case r := <-finalResult:
                result = append(result, r)
            }
        }
    }()
    wg.Wait()
    done<- struct{}{}
    for _, s := range result {
        fmt.Println(s)
    }
}

注意:这只是为了让您敞开心扉寻找更好的解决方案,并且确保这不是生产就绪的代码。


推荐阅读