首页 > 解决方案 > 在带有互斥锁的 goroutine 之间修改的切片未显示正确的同步

问题描述

我是新手,但以前曾使用过并发。我在多个 goroutine 之间共享一个切片时遇到问题,这些切片在所有 goroutine 之间不包含相同的数据。当我修改切片时,我也使用互斥锁来锁定结构,但它似乎没有帮助。我附上了我的代码,想知道我做错了什么,感谢您的帮助!

type State struct {
    waiting int32
    processing int32
    completed int32
}

type Scheduler struct {
    sync.Mutex
    items chan interface{}
    backPressure []interface{}
    capacity int
    canceler context.CancelFunc
    state State
}

func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) Scheduler {
    ctx, cancel := context.WithCancel(context.Background())

    state := State{}

    atomic.StoreInt32(&state.waiting, 0)
    atomic.StoreInt32(&state.processing, 0)
    atomic.StoreInt32(&state.completed, 0)

    scheduler := Scheduler{
        items: make(chan interface{}, capacity),
        backPressure: make([]interface{}, 0),
        capacity: capacity,
        canceler: cancel,
        state: state,
    }

    scheduler.initializeWorkers(ctx, handler)

    return scheduler
}

func (s *Scheduler) initializeWorkers(ctx context.Context, handler func(interface {}) (interface{}, error)) {
    for i := 0; i < 5; i++ {
        go s.newWorker(ctx, handler)
    }
}

func (s *Scheduler) newWorker(ctx context.Context, handler func(interface {}) (interface{}, error)) {
    backoff := 0

    for {
        select {
        case <-ctx.Done():
            return
        case job := <- s.items:
            atomic.AddInt32(&s.state.waiting, -1)
            atomic.AddInt32(&s.state.processing, 1)
            job, _ = handler(job)
            backoff = 0
            atomic.AddInt32(&s.state.processing, -1)
            atomic.AddInt32(&s.state.completed, 1)
        default:
            backoff += 1
            s.CheckBackPressure()
            time.Sleep(time.Duration(backoff * 10) * time.Millisecond)
        }
    }
}

func (s *Scheduler) AddItem(item interface{}) {
    atomic.AddInt32(&s.state.waiting, 1)

    if len(s.items) < s.capacity {
        select {
        case s.items <- item:
            return
        }
    }

    s.Lock()
    defer s.Unlock()

    s.backPressure = append(s.backPressure, item)

    fmt.Printf("new backpressure len %v \n", len(s.backPressure))

    return
}

func (s *Scheduler) Process() {
    var wg sync.WaitGroup

    wg.Add(1)


    go func() {
        defer wg.Done()

        for {
            if atomic.LoadInt32(&s.state.waiting) == 0 && atomic.LoadInt32(&s.state.processing) == 0 {
                return
            }
            runtime.Gosched()
        }
    }()

    wg.Wait()
}

func (s *Scheduler) CheckBackPressure() {
    s.Lock()
    defer s.Unlock()

    if len(s.backPressure) == 0 || s.capacity <= len(s.items) {
        fmt.Printf("backpressure = %d  :: len = %d cap = %d \n", len(s.backPressure), len(s.items), s.capacity)
        return
    }

    fmt.Printf("releasing backpressure \n")

    job, tmp := s.backPressure[0], s.backPressure[1:]

    s.backPressure = tmp

    s.items <- job
    return
}

func (s *Scheduler) Stop() {
    s.canceler()
}

这是我用来测试功能的代码:

type Job struct {
    Value int
}

func TestSchedulerExceedingCapacity(t *testing.T) {


    handler := func (ptr interface{}) (interface{}, error) {
        job, ok := (ptr).(*Job)

        if ok != true {
            return nil, errors.New("failed to convert job")
        }

        // simulate work
        time.Sleep(50 * time.Millisecond)

        return job, nil
    }

    scheduler := NewScheduler(5, handler)

    for i := 0; i < 25; i++ {
        scheduler.AddItem(&(Job { Value: i }))
    }

    fmt.Printf("PROCESSING\n")
    scheduler.Process()
    fmt.Printf("FINISHED\n")
}

当我更新保持背压的切片时,它似乎表明它已通过打印new backpressure len 11-16 正确更新。

但是,当我检查工人的背压时,它表明背压片是空的。backpressure = 0 :: len = 0 cap = 5.

此外,“释放背压”也永远不会打印到标准输出。

这是一些额外的输出......

=== RUN   TestSchedulerExceedingCapacity
new backpressure len 1 
new backpressure len 2 
new backpressure len 3 
new backpressure len 4 
new backpressure len 5 
new backpressure len 6 
new backpressure len 7 
new backpressure len 8 
backpressure = 0  :: len = 0 cap = 5 
new backpressure len 9 
new backpressure len 10 
new backpressure len 11 
new backpressure len 12 
new backpressure len 13 
new backpressure len 14 
new backpressure len 15 
new backpressure len 16 
PROCESSING
backpressure = 0  :: len = 0 cap = 5 
backpressure = 0  :: len = 0 cap = 5 
backpressure = 0  :: len = 0 cap = 5 
...

如果我不终止测试,它将无限期打印backpressure = 0 :: len = 0 cap = 5

我假设我没有正确同步更改,我真的很感激任何见解,谢谢!

标签: goconcurrency

解决方案


好的,一旦我发布了这个问题,我当然能够解决这个问题......

我在某处看到建议使用-race启用数据竞争检测器的选项运行测试。我立即得到了错误,这有助于使问题更容易调试。

事实证明,问题与返回的值有关,NewScheduler而不是与 new 调度程序的指针有关。我将该函数更改为以下代码,从而解决了该问题。

func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) *Scheduler {
    ctx, cancel := context.WithCancel(context.Background())

    state := State{}

    atomic.StoreInt32(&state.waiting, 0)
    atomic.StoreInt32(&state.processing, 0)
    atomic.StoreInt32(&state.completed, 0)
    atomic.StoreInt32(&state.errors, 0)

    scheduler := Scheduler{
        items: make(chan interface{}, capacity),
        backPressure: make([]interface{}, 0),
        capacity: capacity,
        canceler: cancel,
        state: state,
    }

    scheduler.initializeWorkers(ctx, handler)

    return &scheduler
}

推荐阅读