首页 > 解决方案 > 这是在工人循环中使用多 wg 的正确方法吗?

问题描述

在这种情况下,我想用 goroutine worker 保存数据,我想等待保存数据完成,然后我执行新函数来操作数据,这是这种情况

var s struct {
  jobs chan Data
}

func allocateJob([] Data) {
    for _, d := range data {
        s.jobs <- d
    }
    close(s.jobs)
}

func Foo() (err error) {
 resultData = GetData()

 s.jobs = make(chan Data, NumOfWorkers)
 go allocateJob(resultData)

 var wg sync.WaitGroup
 for i := 1; i <= NumOfWorkers; i++ {
    wg.Add(1)
    go func() {
        for job := range jobs {
            err = s.saveData(ctx, job) // i want to wait thise till finish  save all data
            wg.Done()
            err = s.ManipulateDataSomething(ctx, job)
            wg.Done()
        }
        wg.Done()
    }()
 }
 wg.Wait()
 return err
}

有可能这样做并且正确吗?我对并发和 goroutine 很陌生,我希望我问的问题是有道理的

标签: gochannelgoroutine

解决方案


这是一个非常简单的例子:

package main

import (
    "fmt"
    "sync"
)

type job struct {
    do func()
}

func (j job) Do() {
    if j.do != nil {
        j.do()
    }
}

type workerPool struct {
    workers []worker
    stop chan struct{}
    jobs chan job
}

func newWorkerPool(numWorkers int) *workerPool {
    if numWorkers < 1 {
        numWorkers = 1
    }
    
    // stop denotes a channel to reclaim goroutine spawned by each workers.
    stop := make(chan struct{}, 1)
    
    // jobs denotes a job queue which able to queue at most 100 jobs.
    jobs := make(chan job, 100)
    
    // workers denotes a worker thread for concurrent processing jobs.
    workers := make([]worker, numWorkers)
    for i := range workers {
        workers[i] = worker {
            stop: stop,
            jobs: jobs,
        }
    }
    
    return &workerPool {
        workers: workers,
        stop: stop,
        jobs: jobs,
    }
}

// Start spawns multiple worker routines.
func (wp *workerPool) Start() {
    for i := range wp.workers {
        wp.workers[i].Start()
    }
}

// Stop reclaim goroutine spawned each worker.
func (wp *workerPool) Stop() {
    close(wp.stop)
}

// Do create a job and queue it to a job queue.
func (wp *workerPool) Do(fn func()) {
    wp.jobs <- job{do:fn}
}

type worker struct {
    stop  chan struct{}
    jobs  chan job
}

func (w *worker) Start() {
    go w.start()
}

func (w *worker) start() {
    for {
        select {
        case <-w.stop:
            return
        case job := <-w.jobs:
            job.Do()
        }
    }
}

func main() {

    // Create a worker pool with 4 workers inside.
    wp := newWorkerPool(4)
    
    // Start the workerpool to tell workers prepare to work.
    wp.Start()
    defer wp.Stop()
    
    // Using this wait group to wait until all of say hello jobs are processed.
    var helloWg sync.WaitGroup
    
    // Using this wait group to wait until all of say hi jobs are processed.
    var hiWg sync.WaitGroup
    
    // Define function of saying hello.
    sayHello := func() { 
        defer helloWg.Done()
        fmt.Println("Hello")
    }
    
    // Define function of saying hi.
    sayHi := func() {
        defer hiWg.Done()
        fmt.Println("Hi")
    }
    
    // Let's say hello 5 times.
    for i := 0 ; i < 5 ; i++ {
        helloWg.Add(1)
        wp.Do(sayHello)
    }
    
    // Let's say hi 3 times.
    go func() {
        for i := 0 ; i < 3 ; i++ {
            hiWg.Add(1)
            wp.Do(sayHi)
        }
    }()
    
    // Wait for all say hello jobs.
    helloWg.Wait()
    
    // Wait for all say hi jobs.
    hiWg.Wait()
}

游乐场


推荐阅读