go - 在带有互斥锁的 goroutine 之间修改的切片未显示正确的同步
问题描述
我是新手,但以前曾使用过并发。我在多个 goroutine 之间共享一个切片时遇到问题,这些切片在所有 goroutine 之间不包含相同的数据。当我修改切片时,我也使用互斥锁来锁定结构,但它似乎没有帮助。我附上了我的代码,想知道我做错了什么,感谢您的帮助!
type State struct {
waiting int32
processing int32
completed int32
}
type Scheduler struct {
sync.Mutex
items chan interface{}
backPressure []interface{}
capacity int
canceler context.CancelFunc
state State
}
func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) Scheduler {
ctx, cancel := context.WithCancel(context.Background())
state := State{}
atomic.StoreInt32(&state.waiting, 0)
atomic.StoreInt32(&state.processing, 0)
atomic.StoreInt32(&state.completed, 0)
scheduler := Scheduler{
items: make(chan interface{}, capacity),
backPressure: make([]interface{}, 0),
capacity: capacity,
canceler: cancel,
state: state,
}
scheduler.initializeWorkers(ctx, handler)
return scheduler
}
func (s *Scheduler) initializeWorkers(ctx context.Context, handler func(interface {}) (interface{}, error)) {
for i := 0; i < 5; i++ {
go s.newWorker(ctx, handler)
}
}
func (s *Scheduler) newWorker(ctx context.Context, handler func(interface {}) (interface{}, error)) {
backoff := 0
for {
select {
case <-ctx.Done():
return
case job := <- s.items:
atomic.AddInt32(&s.state.waiting, -1)
atomic.AddInt32(&s.state.processing, 1)
job, _ = handler(job)
backoff = 0
atomic.AddInt32(&s.state.processing, -1)
atomic.AddInt32(&s.state.completed, 1)
default:
backoff += 1
s.CheckBackPressure()
time.Sleep(time.Duration(backoff * 10) * time.Millisecond)
}
}
}
func (s *Scheduler) AddItem(item interface{}) {
atomic.AddInt32(&s.state.waiting, 1)
if len(s.items) < s.capacity {
select {
case s.items <- item:
return
}
}
s.Lock()
defer s.Unlock()
s.backPressure = append(s.backPressure, item)
fmt.Printf("new backpressure len %v \n", len(s.backPressure))
return
}
func (s *Scheduler) Process() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
if atomic.LoadInt32(&s.state.waiting) == 0 && atomic.LoadInt32(&s.state.processing) == 0 {
return
}
runtime.Gosched()
}
}()
wg.Wait()
}
func (s *Scheduler) CheckBackPressure() {
s.Lock()
defer s.Unlock()
if len(s.backPressure) == 0 || s.capacity <= len(s.items) {
fmt.Printf("backpressure = %d :: len = %d cap = %d \n", len(s.backPressure), len(s.items), s.capacity)
return
}
fmt.Printf("releasing backpressure \n")
job, tmp := s.backPressure[0], s.backPressure[1:]
s.backPressure = tmp
s.items <- job
return
}
func (s *Scheduler) Stop() {
s.canceler()
}
这是我用来测试功能的代码:
type Job struct {
Value int
}
func TestSchedulerExceedingCapacity(t *testing.T) {
handler := func (ptr interface{}) (interface{}, error) {
job, ok := (ptr).(*Job)
if ok != true {
return nil, errors.New("failed to convert job")
}
// simulate work
time.Sleep(50 * time.Millisecond)
return job, nil
}
scheduler := NewScheduler(5, handler)
for i := 0; i < 25; i++ {
scheduler.AddItem(&(Job { Value: i }))
}
fmt.Printf("PROCESSING\n")
scheduler.Process()
fmt.Printf("FINISHED\n")
}
当我更新保持背压的切片时,它似乎表明它已通过打印new backpressure len 1
1-16 正确更新。
但是,当我检查工人的背压时,它表明背压片是空的。backpressure = 0 :: len = 0 cap = 5
.
此外,“释放背压”也永远不会打印到标准输出。
这是一些额外的输出......
=== RUN TestSchedulerExceedingCapacity
new backpressure len 1
new backpressure len 2
new backpressure len 3
new backpressure len 4
new backpressure len 5
new backpressure len 6
new backpressure len 7
new backpressure len 8
backpressure = 0 :: len = 0 cap = 5
new backpressure len 9
new backpressure len 10
new backpressure len 11
new backpressure len 12
new backpressure len 13
new backpressure len 14
new backpressure len 15
new backpressure len 16
PROCESSING
backpressure = 0 :: len = 0 cap = 5
backpressure = 0 :: len = 0 cap = 5
backpressure = 0 :: len = 0 cap = 5
...
如果我不终止测试,它将无限期打印backpressure = 0 :: len = 0 cap = 5
我假设我没有正确同步更改,我真的很感激任何见解,谢谢!
解决方案
好的,一旦我发布了这个问题,我当然能够解决这个问题......
我在某处看到建议使用-race
启用数据竞争检测器的选项运行测试。我立即得到了错误,这有助于使问题更容易调试。
事实证明,问题与返回的值有关,NewScheduler
而不是与 new 调度程序的指针有关。我将该函数更改为以下代码,从而解决了该问题。
func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
state := State{}
atomic.StoreInt32(&state.waiting, 0)
atomic.StoreInt32(&state.processing, 0)
atomic.StoreInt32(&state.completed, 0)
atomic.StoreInt32(&state.errors, 0)
scheduler := Scheduler{
items: make(chan interface{}, capacity),
backPressure: make([]interface{}, 0),
capacity: capacity,
canceler: cancel,
state: state,
}
scheduler.initializeWorkers(ctx, handler)
return &scheduler
}
推荐阅读
- java - 如何使用 JFrame 在新窗口中打印 Java 控制台?
- lambda - 为什么 java 8 引入了 iterable.forEach() 循环,即使它有每个循环?
- python - 如何将列中的值更改为二进制?
- python - 直接在 cmd 中使用 celery 可以得到正确的结果,但是 nssm 包装的 celery 服务不会这样做,为什么?
- go - Go 仍然显示旧版本
- python - 我想向 ASP 站点发送 Python 请求,但站点显示访问被拒绝
- c# - 如何使用 Linq 语句设置属性
- git - 在我的机器上我没有 Visual Studio,是否可以将 GIT 用于 PL-SQL 文件?
- google-app-maker - 如何允许当前用户拥有与其直接下属和经理关系相关的权限/视图?
- r - R如何检查输入是否是特定功能