首页 > 解决方案 > C# 中的异步文件 I/O 开销

问题描述

我遇到了一个问题,我必须处理大量大型 jsonl 文件(读取、反序列化、进行一些转换数据库查找等,然后将转换后的结果写入 .net 核心控制台应用程序。

通过将输出分批放在单独的线程上,我获得了更好的吞吐量,并试图通过添加一些并行性来改善处理方面,但开销最终是自我挫败。

我一直在做:

using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
    for (;;)
    {
        var l = reader.ReadLine();
        if (l == null)
            break;
        // Deserialize
        // Do some database lookups
        // Do some transforms
        // Pass result to output thread
    }
}

一些诊断时间告诉我,这个ReadLine()调用比反序列化多,等等。为了在上面加上一些数字,一个大文件大约有:

我想将 11 秒的文件 i/o 与其他工作重叠,所以我尝试了

using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
    var nextLine = reader.ReadLineAsync();
    for (;;)
    {
        var l = nextLine.Result;
        if (l == null)
            break;
        nextLine = reader.ReadLineAsync();
        // Deserialize
        // Do some database lookups
        // Do some transforms
        // Pass result to output thread
    }
}

在我进行转换的同时进行下一个 I/O。只是最终花费的时间比常规同步的时间要长得多(比如两倍)。

我的要求是他们希望对整体结果具有可预测性(即,必须按名称顺序处理相同的文件集,并且必须按相同的顺序可预测地输出行)所以我不能每次只抛出一个文件线程,让他们打出来。

我只是试图引入足够的并行性来平滑大量输入的吞吐量,我很惊讶上面的结果会适得其反。

我在这里错过了什么吗?

标签: c#asynchronousparallel-processing

解决方案


Theodor 的建议看起来是一个非常强大且有用的库,值得一试,但如果您正在寻找一个较小的 DIY 解决方案,我将采用以下方法:

using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace Parallelism
{
    class Program
    {
        private static Queue<string> _queue = new Queue<string>();
        private static Task _lastProcessTask;
        
        static async Task Main(string[] args)
        {
            string path = "???";
            await ReadAndProcessAsync(path);
        }

        private static async Task ReadAndProcessAsync(string path)
        {
            using (var str = File.OpenRead(path))
            using (var sr = new StreamReader(str))
            {
                string line = null;
                while (true)
                {
                    line = await sr.ReadLineAsync();
                    if (line == null)
                        break;

                    lock (_queue)
                    {
                        _queue.Enqueue(line);
                        if (_queue.Count == 1)
                            // There was nothing in the queue before
                            // so initiate a new processing loop. Save 
                            // but DON'T await the Task yet.
                            _lastProcessTask = ProcessQueueAsync();
                    }
                }                
            }

            // Now that file reading is completed, await 
            // _lastProcessTask to ensure we don't return
            // before it's finished.
            await _lastProcessTask;
        }

        // This will continue processing as long as lines are in the queue,
        // including new lines entering the queue while processing earlier ones.
        private static Task ProcessQueueAsync()
        {
            return Task.Run(async () =>
            {
                while (true)
                {
                    string line;
                    lock (_queue)
                    {              
                        // Only peak at first so the read loop doesn't think
                        // the queue is empty and initiate a second processing
                        // loop while we're processing this line.
                        if (!_queue.TryPeek(out line))
                            return;
                    }
                    await ProcessLineAsync(line);
                    lock (_queue)
                    {
                        // Dequeues the item we just processed. If it's the last
                        // one, this loop is done.
                        _queue.Dequeue();
                        if (_queue.Count == 0)
                            return;
                    }
                }
            });
        }

        private static async Task ProcessLineAsync(string line)
        {
            // do something
        }
    }
}

请注意,这种方法有一个处理循环,当队列中没有任何内容时终止,并在新项目准备就绪时重新启动。另一种方法是有一个连续的处理循环,在队列为空时重复重新检查并执行Task.Delay()一小段时间。我更喜欢我的方法,因为它不会因定期和不必要的检查而使工作线程陷入困境,但性能可能会有不明显的不同。

也只是为了评论 Blindy 的回答,我不同意在这里不鼓励使用并行性。首先,如今大多数 CPU 都是多核的,因此巧妙地使用 .NET 线程池实际上会在多核 CPU 上运行时最大限度地提高应用程序的效率,并且在单核场景中的缺点非常小。

不过,更重要的是,异步不等于多线程。异步编程早在多线程之前就已经存在,I/O 是最显着的例子。I/O 操作在很大程度上由CPU以外的硬件处理 - NIC、SATA 控制器等。它们使用一个古老的概念,称为硬件中断,今天大多数编码人员可能从未听说过,并且比多线程早了几十年。它基本上只是一种在非 CPU 操作完成时给 CPU 一个回调以执行的方法。因此,当您使用表现良好的异步 API 时(尽管 .NETFileStream存在 Theodore 提到的问题),您的 CPU 根本不应该做那么多工作。当你await对于这样的 API,CPU 基本上处于空闲状态,直到机器中的其他硬件将请求的数据写入 RAM。

我同意 Blindy 的观点,如果计算机科学程序能更好地教人们计算机硬件是如何工作的,那就更好了。用柯克船长的话来说,利用 CPU 在等待从磁盘读取数据、从网络读取数据等时可以做其他事情的事实,就是“军官思维”。


推荐阅读