go - Golang:在“case”语句中为多个工作人员消费项目
问题描述
我的消费者(从 运行)支持上下文取消和通过语句main
从通道读取。case
我可以使用上下文关闭消费者,效果很好。但是,当我在一个案例语句中生成多个工作人员时,每个工作人员都会从 获得相同的工作(消息)jobsChan
,这不是我想要的:
func (app *App) consumer() {
for {
select {
case <-app.ctx.Done():
app.infoLog.Print("Caught SIGINT, stopping.")
app.wg.Wait()
app.doneChan <- struct{}{} # main uses this channel to block itself until all goroutines are stopped
app.infoLog.Print("Shutting down the consumer...")
return
case job := <-app.jobsChan:
// PROBLEM here: wrong, each worker is given the same job
for workerNumber := 0; workerNumber < app.config.workers; workerNumber++ {
app.wg.Add(1)
go app.workerFunc(workerNumber, job)
}
}
}
}
func (app *App) workerFunc(id int, job Job) {
defer app.wg.Done()
... actual worker code here ...
}
如何重写此代码以便我可以保留select
频道app.ctx.Done
并同时生成工人,以便每个工人从频道中选择下一条消息作为作业?我需要继续for/select
监听ctx
取消,但同时我需要生成 X 工作人员来读取来自jobsChan
消费者的消息。这可能吗?
想到的唯一选择是将 channel 直接传递给 spawnedworkerFunc
并for job := range app.jobsChan
在workerFunc
. 但随后case job := <-app.jobsChan:
消费者的整体变得毫无意义,我不知道如何重写它。
澄清一下:当我运行应用程序时,我希望每个工作人员都有一个新的工作 id 从jobsChan
- 但他们都处理相同的,例如 1,然后他们都处理下一个,例如 2
#wrong
Worker 0: start processing item 1
Worker 2: start processing item 1
Worker 1: start processing item 1
解决方案
您现有的代码明确地将相同的工作分配给所有工作人员。如果您有固定数量的工作人员,请为他们创建 goroutine(在初始化期间),并让他们收听频道:
for workerNumber:0;workerNumber<app.config.workers;workerNumber++ {
go app.workerFunc(ctx,workerNumber,app.jobsChan)
}
在每个工作人员中,只需检查 jobQueue 和上下文取消。
换句话说,您不需要consumer
, 将工作直接传递给工人。
推荐阅读
- python-3.x - 如何自适应地将图像分割成多个区域并为每个区域设置不同的文本方向?
- python - keras rl - dqn 模型更新
- uml - UML中的三个表的多对多关系
- node.js - MongoDB - 哪个更有效 - 文档上的嵌套数组或对另一个集合的引用
- bash - Plex 如何使用 CLI 获取声明令牌
- python - 有什么可以影响 time.sleep() 我需要更早发生的事情吗?
- html - 有什么方法可以防止在不使用 javascript/jquery 的情况下双击 HTML 中的按钮?
- java - 如何在 Java 中使用扫描仪获取多个输入?
- python - 使用 PyInstaller 后 FigureCanvas 未解释为 QtWidget
- scala - Gatling:对隐藏在 HTML 响应中的一些 JSON 执行检查