首页 > 解决方案 > 尝试使用通道暂停和恢复 goroutine,但最终它们挂断了

问题描述

我需要创建多个对 GitHub 进行 API 调用的 goroutine,以批量获取对一个组织的所有 repos 的 PR 的评论。例如

{RepoA: #1, #2, ..........#20}
{RepoA: #21, #22, ..........#40}
.
.
{RepoZ: #1, #2, ..........#20}

这是我的调用过程

// get the heartbeats channel
heartbeats := signal.Heartbeats()
// get the run chasennel
run := signal.Run()
rem, _ := c.ghCollector.GetRemainingRequests(ctx)
// fill Signal.run chan with as many empty structs as there are github api requests left
signal.SetRun(rem)

// run all jobs
for _, batch := range batches {
        signal.RegisterTask() // this just increments a wait group counter
        c.ghCollector.GetReviewsWorker(ctx, batch, signal, heartbeats, run)
}

go func() {
            logging.Debug("infinite goroutine")
            for {
                select {
                case <-quit:
                    logging.Debug("all goroutines have exited, quit")
                    return

                case <-heartbeats:
                    logging.Debug("goroutines are running, skip the API Rate limit poll in default case")
                    continue

                default:
                 // get the number of requests remaining
                    if rem, _ := c.ghCollector.GetRemainingRequests(ctx); rem > 0 {
                        // fill the run channel with the remaining number of requests with empty structs
                        signal.SetRun(rem)
                    } else {
                        logging.Debug("none left")
                        // if requests are not available then loop again
                        heartbeats <- struct{}{}
                    }
                }
            }
        }()
       // wait for all the goroutines to be done and decrement the waitgroup counter
       signal.Wait()
       quit <- struct{}{}

这是工人功能

GetReviewsWorker(ctx context.Context, prs *types.PRBatch, signal *orchestrator.Signal, heartbeats chan struct{}, run <-chan struct{}) {
    go func() {
        // decrement wg
        defer signal.Done()
        for _, pr := prs.Batch {
           // run only if there are requests left 
           <-run
           // if running then send a heartbeat to indicate the worker is not idle/waiting
           heartbeats <- struct{}{}
           // do some work, make some API calls
        }
    }()
}

这是信号结构

type Signal struct {
    run        chan struct{}
    heartbeats chan struct{}
    wg         sync.WaitGroup
}

func NewSignal(length int) *Signal {
    return &Signal{
        heartbeats: make(chan struct{}),
        // because send and receives from a nil channel block forever
        run: make(chan struct{}, length),
    }
}

func (s *Signal) SetRun(runs int) {
    // only send runs as many as the running tasks
    for i := 0; i < runs; i++ {
        logging.Debug("RUN RUN RUN ", i)
        s.run <- struct{}{}
    }
}

// other methods
//

我为一个组织运行此程序,该组织启动了大约 1000 个 goroutine,并且在一段时间内运行良好,但最终它挂断了,并且没有任何内容打印到日志中。

我究竟做错了什么?我的计划是每次我们有可用的请求时通过用剩余请求数填充 run chan 来进行 API 调用。如果通道中有东西,则执行将继续,否则它将阻塞,直到通道再次被填满

如果它继续,则会向心跳通道发送一个信号(该通道是为了避免向 Github 发出速率限制查询请求)。如果有心跳,我们知道 goroutine 正在做一些工作并且没有被阻塞,所以我们只是像往常一样循环选择而不检查剩余的请求

一旦所有的 goroutine 都完成了,我们发送退出信号来停止轮询 Github API 以获取速率限制的过程,我们就完成了。我认为这可以正常工作,但它只在我的脑海中正常工作。

编辑:添加有关该问题的更多详细信息

所以我想做的是使用 graphql API 获取组织的每个 PR 的评论数量。我有小批量的每个 repo 的 PR 数量,比如顶部描述的 20 个。

对于每个批次,我都想运行我的评论工作者,它将通过循环调用来获取该批次 PR 的评论数量。这个审查工作人员在自己的 goroutine 中为每个批次运行

由于 Github 的速率限制为 5000 req/hr,我希望能够启动我所有的 goroutine,但是一旦达到速率限制,我希望它们都暂停执行并等待我们有更多可用的请求我正在使用具有空结构的通道来指示我们是否有可用的请求。这是运行通道,当它被填充时,我们继续我们的业务,当它为空时它会阻塞正在运行的 goroutine。

在开始审查工作人员之前,代码会调用 github 以查看有多少请求可用,根据该数字,我们用尽可能多的空结构填充 run chan,以便我们可以在 goroutine 中进行尽可能多的 api 调用,然后阻塞。

标签: go

解决方案


推荐阅读