go - 节目与频道挂起
问题描述
我想使用 goroutine 来批处理来自不同客户的不同日期的请求。
我的意思是 50 个消费者 goroutine 来消费来自 db 的所有客户,以及 2 个日期消费者 goroutine 来消费日期切片。
主要代码如下,但它挂起并且没有按预期退出。
为什么它没有按预期退出?
func Run(){
var syncWg sync.WaitGroup
syncWg.Add(1)
go SyncCustomerMetricsHistory(&syncWg)
syncWg.Wait()
}
func SyncCustomerMetricsHistory(wg *sync.WaitGroup){
defer wg.Done()
odb := orm.NewOrm()
start := time.Now()
logs.Info("start sync customer metrics, time:[%v]", start)
qs := odb.QueryTable("gg_customer")
var customers []*db.GgCustomer
if num, err := qs.All(&customers); err != nil || num == 0 {
logs.Error("Get customer error, rows:[%v], err:[%v]", num, err)
}
customersChan := make(chan *db.GgCustomer, 50)
var wgC sync.WaitGroup
wgC.Add(50)
for i := 0; i < 50; i++ {
go syncCustomerMetricsHistory(customersChan, &wgC)
}
go func() {
for _, customer := range customers {
customersChan <- customer
}
close(customersChan)
}()
wgC.Wait()
}
func syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer, wg *sync.WaitGroup){
defer wg.Done()
for customer := range customerChan{
dateChan := make(chan string, 2)
var wgD sync.WaitGroup
wgD.Add(2)
for i := 1; i < 2; i++{
go test(dateChan, customer, &wgD)
}
go func(){
for _, date := range GetAllYearDate(){
dateChan <- date
}
close(dateChan)
}()
wgD.Wait()
}
}
}
func test(dateChan <- chan string, customer *db.GgCustomer, wg *sync.WaitGroup){
defer wg.Done()
for date := range dateChan{
fmt.Println(date, customer)
}
}
func GetAllYearDate() []string{
return []string{"2019-10-01", "2019-10-02"}
}
解决方案
我没有尝试运行它(因为它需要额外的代码)但相信你的问题是:
wgD.Add(2)
for i := 1; i < 2; i++{
go test(dateChan, customer, &wgD)
}
那个 for 循环只会迭代一次,但你调用了 wgD.Add(2) (我想你可能意味着循环迭代两次;试试i <= 2
)。
另一点反馈;您使用等待组的方式会起作用,但很难遵循(可能导致您没有发现问题);怎么样:
func Run(){
SyncCustomerMetricsHistory() // No wait group needed as this will not return before done
}
func SyncCustomerMetricsHistory(){
odb := orm.NewOrm()
start := time.Now()
logs.Info("start sync customer metrics, time:[%v]", start)
qs := odb.QueryTable("gg_customer")
var customers []*db.GgCustomer
if num, err := qs.All(&customers); err != nil || num == 0 {
logs.Error("Get customer error, rows:[%v], err:[%v]", num, err)
}
customersChan := make(chan *db.GgCustomer, 50)
var wgC sync.WaitGroup
wgC.Add(50)
for i := 0; i < 50; i++ {
go func() {
syncCustomerMetricsHistory(customersChan)
wgC.Done()
}()
}
go func() {
for _, customer := range customers {
customersChan <- customer
}
close(customersChan)
}()
wgC.Wait()
}
func syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer){
for customer := range customerChan{
dateChan := make(chan string, 2)
var wgD sync.WaitGroup
wgD.Add(2)
for i := 1; i < 2; i++{
go func() {
test(dateChan, customer)
wgD.Done()
}()
}
go func(){
for _, date := range GetAllYearDate(){
dateChan <- date
}
close(dateChan)
}()
wgD.Wait()
}
}
}
我认为这更容易理解,因为您可以看到 wg.Done() 被调用的位置。在任一侧粘贴一些 fmt.Println 命令也很容易,这使得调试此类问题变得更加简单。
推荐阅读
- postgresql - 如何通过检查列的时间来创建 CASE WHEN
- android - Ionic 3 cordova 遇到设备/模拟器错误 [Android]
- entity-framework - 未使用的表列需要在实体框架对象类中声明为属性?
- node.js - 将 reactjs 与 nodejs 连接起来
- laravel - 从数据库中检索分层数据
- javascript - 避免 TypeScript 中接口的初始 I 不会导致命名冲突吗?
- microsoft-graph-api - 如何识别 OneDrive DriveItem 权限标识是组还是用户
- c# - 如何使用 Web Api 在 Dynamics 365 CRM(在线)中发现我的组织的 URL
- django - 在 drf viewset 检索 url 中将 pk 验证为 int
- openlayers - 在 openlayers 中更改几何图形后如何选择特征?