go - 如何处理共享同一通道的多个 goroutine
问题描述
我一直在寻找很多,但还没有找到我的问题的答案。
我需要多次调用外部 API,但同时使用不同的参数。然后对于每个调用,我需要为每个数据集初始化一个结构并处理我从 API 调用接收到的数据。请记住,我阅读了传入请求的每一行并立即开始将其发送到通道。
由于我接收的数据量很大,我遇到的第一个问题一开始并不明显,是每个 goroutine 没有接收到通过通道的所有数据。(我通过我所做的研究了解到)。所以我需要一种将数据重新排队/重定向到正确的 goroutine 的方法。
从单个数据集发送流式响应的函数。(我已经删除了脱离上下文的无用代码部分)
func (api *API) RequestData(ctx context.Context, c chan DWeatherResponse, dataset string, wg *sync.WaitGroup) error {
for {
line, err := reader.ReadBytes('\n')
s := string(line)
if err != nil {
log.Println("End of %s", dataset)
return err
}
data, err := extractDataFromStreamLine(s, dataset)
if err != nil {
continue
}
c <- *data
}
}
处理传入数据的函数
func (s *StrikeStruct) Process(ch, requeue chan dweather.DWeatherResponse) {
for {
data, more := <-ch
if !more {
break
}
// data contains {dataset string, value float64, date time.Time}
// The s.Parameter needs to match the dataset
// IMPORTANT PART, checks if the received data is part of this struct dataset
// If not I want to send it to another go routine until it gets to the correct
one. There will be a max of 4 datasets but still this could not be the best approach to have
if !api.GetDataset(s.Parameter, data.Dataset) {
requeue <- data
continue
}
// Do stuff with the data from this point
}
}
现在在我自己的 API 端点上,我有以下内容:
ch := make(chan dweather.DWeatherResponse, 2)
requeue := make(chan dweather.DWeatherResponse)
final := make(chan strike.StrikePerYearResponse)
var wg sync.WaitGroup
for _, s := range args.Parameters.Strikes {
strike := strike.StrikePerYear{
Parameter: strike.Parameter(s.Dataset),
StrikeValue: s.Value,
}
// I receive and process the data in here
go strike.ProcessStrikePerYear(ch, requeue, final, string(s.Dataset))
}
go func() {
for {
data, _ := <-requeue
ch <- data
}
}()
// Creates a goroutine for each dataset
for _, dataset := range api.Params.Dataset {
wg.Add(1)
go api.RequestData(ctx, ch, dataset, &wg)
}
wg.Wait()
close(ch)
//Once the data is all processed it is all appended
var strikes []strike.StrikePerYearResponse
for range args.Fetch.Datasets {
strikes = append(strikes, <-final)
}
return strikes
这段代码的问题是,一旦我开始从多个端点接收数据,requeue
就会阻塞,不会再发生任何事情。如果我删除该requeue
逻辑数据,如果它没有落在正确的 goroutine 上,它将丢失。
我的两个问题是:
- 如果它有一个总是准备好接收的 goroutine,为什么 requeue 会阻塞?
- 我应该对如何处理传入数据采取不同的方法吗?
解决方案
这不是解决问题的好方法。你应该改变你的解决方案。我建议如下实现:
import (
"fmt"
"sync"
)
// answer for https://stackoverflow.com/questions/68454226/how-to-handle-multiple-goroutines-that-share-the-same-channel
var (
finalResult = make(chan string)
)
// IData use for message dispatcher that all struct must implement its method
type IData interface {
IsThisForMe() bool
Process(*sync.WaitGroup)
}
//MainData can be your main struct like StrikePerYear
type MainData struct {
// add any props
Id int
Name string
}
type DataTyp1 struct {
MainData *MainData
}
func (d DataTyp1) IsThisForMe() bool {
// you can check your condition here to checking incoming data
if d.MainData.Id == 2 {
return true
}
return false
}
func (d DataTyp1) Process(wg *sync.WaitGroup) {
d.MainData.Name = "processed by DataTyp1"
// send result to final channel, you can change it as you want
finalResult <- d.MainData.Name
wg.Done()
}
type DataTyp2 struct {
MainData *MainData
}
func (d DataTyp2) IsThisForMe() bool {
// you can check your condition here to checking incoming data
if d.MainData.Id == 3 {
return true
}
return false
}
func (d DataTyp2) Process(wg *sync.WaitGroup) {
d.MainData.Name = "processed by DataTyp2"
// send result to final channel, you can change it as you want
finalResult <- d.MainData.Name
wg.Done()
}
//dispatcher will run new go routine for each request.
//you can implement a worker pool to preventing running too many go routines.
func dispatcher(incomingData *MainData, wg *sync.WaitGroup) {
// based on your requirements you can remove this go routing or not
go func() {
var p IData
p = DataTyp1{incomingData}
if p.IsThisForMe() {
go p.Process(wg)
return
}
p = DataTyp2{incomingData}
if p.IsThisForMe() {
go p.Process(wg)
return
}
}()
}
func main() {
dummyDataArray := []MainData{
MainData{Id: 2, Name: "this data #2"},
MainData{Id: 3, Name: "this data #3"},
}
wg := sync.WaitGroup{}
for i := range dummyDataArray {
wg.Add(1)
dispatcher(&dummyDataArray[i], &wg)
}
result := make([]string, 0)
done := make(chan struct{})
// data collector
go func() {
loop:for {
select {
case <-done:
break loop
case r := <-finalResult:
result = append(result, r)
}
}
}()
wg.Wait()
done<- struct{}{}
for _, s := range result {
fmt.Println(s)
}
}
注意:这只是为了让您敞开心扉寻找更好的解决方案,并且确保这不是生产就绪的代码。
推荐阅读
- c# - 选择不同的行,但始终选择最后一行
- python - 从嵌套的字母列表中生成所有单词组合
- android - 如何更新从另一个类调用的方法中的视图?
- python - Flask SQLAlchemy 仅更新 2 个模型之间的链接
- python - 将 SAS 数据文件读取到 Spark Dataframe 并加载到 S3 时出现问题
- javascript - 烧瓶登录重定向生成 302 但浏览器不会像其他功能一样更新
- linux - 我可以创建 udev 规则来跟踪 edac 错误吗?
- sql - PL/SQL Case When - 将输入映射到其他列值
- python-3.x - 使用枕头生成可搜索的 PDF
- linux - Jenkins 不能使用 aws-cli 密钥。无法找到 AWS CLI 的凭证