首页 > 解决方案 > 从不同线程中的繁重操作同步写入文件

问题描述

我需要一次一个块地详细说明一个文件(可能是一个大文件)并将结果写入一个新文件。简单地说,我有一个基本功能来阐述一个块:

func elaborateBlock(block []byte) []byte { ... }

每个块都需要详细说明,然后按顺序写入输出文件(保持原始顺序)。

单线程实现很简单:

for {
        buffer := make([]byte, BlockSize)
        _, err := inputFile.Read(buffer)

        if err == io.EOF {
            break
        }
        processedData := elaborateBlock(buffer)
        outputFile.Write(processedData)
}

但是细化可能很繁重,并且每个块都可以单独处理,因此多线程实现是自然演变。

我想出的解决方案是创建一个通道数组,在不同的线程中计算每个块,并通过循环通道数组来同步最终写入:

实用功能:

func blockThread(channel chan []byte, block []byte) {
    channel <- elaborateBlock(block)
}

在主程序中:

chans = []chan []byte {}

for {
    buffer := make([]byte, BlockSize)
    _, err := inputFile.Read(buffer)

    if err == io.EOF {
        break
    }

    channel := make(chan []byte)
    chans = append(chans, channel)

    go blockThread(channel, buffer)
}

for i := range chans {
    data := <- chans[i]
    outputFile.Write(data)
}

这种方法有效,但对于大文件可能会出现问题,因为它需要在开始写入输出之前将整个文件加载到内存中。

您是否认为有更好的解决方案,整体性能也更好?

标签: multithreadingfilegosynchronization

解决方案


如果确实需要按顺序写出块

如果你想同时处理多个块,显然你需要同时在内存中保存多个块。

您可以决定要同时处理多少块,同时将尽可能多的块读入内存就足够了。例如,您可能会说您想同时处理 5 个块。这将限制内存使用,并且仍然可能最大限度地利用您的 CPU 资源。建议根据您可用的 CPU 内核选择一个数字(如果处理一个块尚未使用多核)。这可以使用查询runtime.GOMAXPROCS(0)

您应该有一个单独的 goroutine 来顺序读取输入文件,并生成包装在 Jobs 中的块(其中也包含块索引)。

您应该有多个工作 goroutine,最好与您拥有的内核一样多(但也可以尝试使用更小和更高的值)。每个 worker goroutine 只是接收作业,调用elaborateBlock()数据,然后将其传递到结果通道上。

应该有一个指定的消费者接收已完成的作业,并将它们按顺序写入输出文件。由于 goroutines 并发运行并且我们无法控制块的完成顺序,因此使用者应该跟踪要写入输出的下一个块的索引。乱序到达的块只应被存储,并且只有在后续块到达时才继续写入。

这是一个(不完整的)示例,如何执行所有这些操作:

const BlockSize = 1 << 20 // 1 MB

func elaborateBlock(in []byte) []byte { return in }

type Job struct {
    Index int
    Block []byte
}

func producer(jobsCh chan<- *Job) {
    // Init input file:
    var inputFile *os.File

    for index := 0; ; index++ {
        job := &Job{
            Index: index,
            Block: make([]byte, BlockSize),
        }

        _, err := inputFile.Read(job.Block)
        if err != nil {
            break
        }

        jobsCh <- job
    }
}

func worker(jobsCh <-chan *Job, resultCh chan<- *Job) {
    for job := range jobsCh {
        job.Block = elaborateBlock(job.Block)
        resultCh <- job
    }
}

func consumer(resultCh <-chan *Job) {
    // Init output file:
    var outputFile *os.File

    nextIdx := 0
    jobMap := map[int]*Job{}

    for job := range resultCh {
        jobMap[job.Index] = job

        // Write out all blocks we have in contiguous index range:
        for {
            j := jobMap[nextIdx]
            if j == nil {
                break
            }
            if _, err := outputFile.Write(j.Block); err != nil {
                // handle error, maybe terminate?
            }
            delete(nextIdx) // This job is written out
            nextIdx++
        }
    }
}

func main() {
    jobsCh := make(chan *Job)
    resultCh := make(chan *Job)

    for i := 0; i < 5; i++ {
        go worker(jobsCh, resultCh)
    }

    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        consumer(resultCh)
    }()

    // Start producing jobs:
    producer(jobsCh)
    // No more jobs:
    close(jobsCh)

    // Wait for consumer to complete:
    wg.Wait()
}

这里要注意的一件事:仅此一项并不能保证限制使用的内存。想象一下这样一种情况,第一个块需要大量时间来计算,而后续块则不需要。会发生什么?第一个块将占用一个工人,其他工人将“快速”完成后续块。消费者会将所有内容存储在内存中,等待第一个块完成(因为必须先写出)。这可能会增加内存使用量。

我们怎样才能避免这种情况?

通过引入工作池。新工作不能随意创造,而是从一个池子中获取。如果池为空,则生产者必须等待。所以当生产者需要一个新的时Job,从池中取一个。当消费者写出 aJob时,将其放回池中。就那么简单。这也将减少垃圾收集器的压力,因为作业(和大[]byte缓冲区)不会被创建和丢弃,它们可以被重新使用。

对于简单的Job池实现,您可以使用缓冲通道。有关详细信息,请参阅如何在 Golang 中实现内存池

如果块可以按任何顺序写入

另一种选择可能是提前分配输出文件。如果输出块的大小也是确定的,您可以这样做(例如outsize := (insize / blocksize) * outblockSize)。

达到什么目的?

如果您预先分配了输出文件,则使用者无需按顺序等待输入块。一旦计算了输入块,您就可以计算它将在输出中的位置,寻找该位置并编写它。为此,您可以使用File.Seek().

该方案仍然需要将块索引从生产者发送给消费者,但消费者不需要存储乱序到达的块,因此消费者可以更简单,不需要存储完成的块,直到随后一个到达以继续写入输出文件。

请注意,此解决方案自然不会造成内存威胁,因为已完成的作业永远不会累积/缓存,它们会按完成顺序写出。


有关更多详细信息和技术,请参阅相关问题:

这是 Go 中惯用的工作线程池吗?

如何从按特定顺序执行的 N 个 goroutine 中收集值?


推荐阅读