multithreading - 从不同线程中的繁重操作同步写入文件
问题描述
我需要一次一个块地详细说明一个文件(可能是一个大文件)并将结果写入一个新文件。简单地说,我有一个基本功能来阐述一个块:
func elaborateBlock(block []byte) []byte { ... }
每个块都需要详细说明,然后按顺序写入输出文件(保持原始顺序)。
单线程实现很简单:
for {
buffer := make([]byte, BlockSize)
_, err := inputFile.Read(buffer)
if err == io.EOF {
break
}
processedData := elaborateBlock(buffer)
outputFile.Write(processedData)
}
但是细化可能很繁重,并且每个块都可以单独处理,因此多线程实现是自然演变。
我想出的解决方案是创建一个通道数组,在不同的线程中计算每个块,并通过循环通道数组来同步最终写入:
实用功能:
func blockThread(channel chan []byte, block []byte) {
channel <- elaborateBlock(block)
}
在主程序中:
chans = []chan []byte {}
for {
buffer := make([]byte, BlockSize)
_, err := inputFile.Read(buffer)
if err == io.EOF {
break
}
channel := make(chan []byte)
chans = append(chans, channel)
go blockThread(channel, buffer)
}
for i := range chans {
data := <- chans[i]
outputFile.Write(data)
}
这种方法有效,但对于大文件可能会出现问题,因为它需要在开始写入输出之前将整个文件加载到内存中。
您是否认为有更好的解决方案,整体性能也更好?
解决方案
如果确实需要按顺序写出块
如果你想同时处理多个块,显然你需要同时在内存中保存多个块。
您可以决定要同时处理多少块,同时将尽可能多的块读入内存就足够了。例如,您可能会说您想同时处理 5 个块。这将限制内存使用,并且仍然可能最大限度地利用您的 CPU 资源。建议根据您可用的 CPU 内核选择一个数字(如果处理一个块尚未使用多核)。这可以使用查询runtime.GOMAXPROCS(0)
。
您应该有一个单独的 goroutine 来顺序读取输入文件,并生成包装在 Jobs 中的块(其中也包含块索引)。
您应该有多个工作 goroutine,最好与您拥有的内核一样多(但也可以尝试使用更小和更高的值)。每个 worker goroutine 只是接收作业,调用elaborateBlock()
数据,然后将其传递到结果通道上。
应该有一个指定的消费者接收已完成的作业,并将它们按顺序写入输出文件。由于 goroutines 并发运行并且我们无法控制块的完成顺序,因此使用者应该跟踪要写入输出的下一个块的索引。乱序到达的块只应被存储,并且只有在后续块到达时才继续写入。
这是一个(不完整的)示例,如何执行所有这些操作:
const BlockSize = 1 << 20 // 1 MB
func elaborateBlock(in []byte) []byte { return in }
type Job struct {
Index int
Block []byte
}
func producer(jobsCh chan<- *Job) {
// Init input file:
var inputFile *os.File
for index := 0; ; index++ {
job := &Job{
Index: index,
Block: make([]byte, BlockSize),
}
_, err := inputFile.Read(job.Block)
if err != nil {
break
}
jobsCh <- job
}
}
func worker(jobsCh <-chan *Job, resultCh chan<- *Job) {
for job := range jobsCh {
job.Block = elaborateBlock(job.Block)
resultCh <- job
}
}
func consumer(resultCh <-chan *Job) {
// Init output file:
var outputFile *os.File
nextIdx := 0
jobMap := map[int]*Job{}
for job := range resultCh {
jobMap[job.Index] = job
// Write out all blocks we have in contiguous index range:
for {
j := jobMap[nextIdx]
if j == nil {
break
}
if _, err := outputFile.Write(j.Block); err != nil {
// handle error, maybe terminate?
}
delete(nextIdx) // This job is written out
nextIdx++
}
}
}
func main() {
jobsCh := make(chan *Job)
resultCh := make(chan *Job)
for i := 0; i < 5; i++ {
go worker(jobsCh, resultCh)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
consumer(resultCh)
}()
// Start producing jobs:
producer(jobsCh)
// No more jobs:
close(jobsCh)
// Wait for consumer to complete:
wg.Wait()
}
这里要注意的一件事:仅此一项并不能保证限制使用的内存。想象一下这样一种情况,第一个块需要大量时间来计算,而后续块则不需要。会发生什么?第一个块将占用一个工人,其他工人将“快速”完成后续块。消费者会将所有内容存储在内存中,等待第一个块完成(因为必须先写出)。这可能会增加内存使用量。
我们怎样才能避免这种情况?
通过引入工作池。新工作不能随意创造,而是从一个池子中获取。如果池为空,则生产者必须等待。所以当生产者需要一个新的时Job
,从池中取一个。当消费者写出 aJob
时,将其放回池中。就那么简单。这也将减少垃圾收集器的压力,因为作业(和大[]byte
缓冲区)不会被创建和丢弃,它们可以被重新使用。
对于简单的Job
池实现,您可以使用缓冲通道。有关详细信息,请参阅如何在 Golang 中实现内存池。
如果块可以按任何顺序写入
另一种选择可能是提前分配输出文件。如果输出块的大小也是确定的,您可以这样做(例如outsize := (insize / blocksize) * outblockSize
)。
达到什么目的?
如果您预先分配了输出文件,则使用者无需按顺序等待输入块。一旦计算了输入块,您就可以计算它将在输出中的位置,寻找该位置并编写它。为此,您可以使用File.Seek()
.
该方案仍然需要将块索引从生产者发送给消费者,但消费者不需要存储乱序到达的块,因此消费者可以更简单,不需要存储完成的块,直到随后一个到达以继续写入输出文件。
请注意,此解决方案自然不会造成内存威胁,因为已完成的作业永远不会累积/缓存,它们会按完成顺序写出。
有关更多详细信息和技术,请参阅相关问题:
推荐阅读
- powershell - 在 jams powershell 中创建 Precheck 作业
- azure-data-factory - 从 Azure 数据工厂中的第二次查找中的值筛选查找结果
- hp-uft - UFT:从excel文件中提取数据并在应用程序中动态输入的问题
- visualvm - VisualVM - 堆转储灰显
- sql-server - 如何向底层视图添加新字段?
- authentication - 使用 Azure 应用服务身份验证的 HttpContext 用户为空
- c# - 使用 Microsoft.CodeAnalysis/Roslyn 获取 XML 文档
- javascript - 如何使用 XPath 解析 JavaScript 内容
- mongodb - $查找对象数组中的id
- c# - 如何在 C# 6.0 的泛型类中包含特定于类型的代码?