c# - C# 中的异步文件 I/O 开销
问题描述
我遇到了一个问题,我必须处理大量大型 jsonl 文件(读取、反序列化、进行一些转换数据库查找等,然后将转换后的结果写入 .net 核心控制台应用程序。
通过将输出分批放在单独的线程上,我获得了更好的吞吐量,并试图通过添加一些并行性来改善处理方面,但开销最终是自我挫败。
我一直在做:
using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
for (;;)
{
var l = reader.ReadLine();
if (l == null)
break;
// Deserialize
// Do some database lookups
// Do some transforms
// Pass result to output thread
}
}
一些诊断时间告诉我,这个ReadLine()
调用比反序列化多,等等。为了在上面加上一些数字,一个大文件大约有:
- 在 ReadLine 上花费了 11 秒
- 序列化花费 7.8 秒
- 数据库查找花费了 10 秒
我想将 11 秒的文件 i/o 与其他工作重叠,所以我尝试了
using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
var nextLine = reader.ReadLineAsync();
for (;;)
{
var l = nextLine.Result;
if (l == null)
break;
nextLine = reader.ReadLineAsync();
// Deserialize
// Do some database lookups
// Do some transforms
// Pass result to output thread
}
}
在我进行转换的同时进行下一个 I/O。只是最终花费的时间比常规同步的时间要长得多(比如两倍)。
我的要求是他们希望对整体结果具有可预测性(即,必须按名称顺序处理相同的文件集,并且必须按相同的顺序可预测地输出行)所以我不能每次只抛出一个文件线程,让他们打出来。
我只是试图引入足够的并行性来平滑大量输入的吞吐量,我很惊讶上面的结果会适得其反。
我在这里错过了什么吗?
解决方案
Theodor 的建议看起来是一个非常强大且有用的库,值得一试,但如果您正在寻找一个较小的 DIY 解决方案,我将采用以下方法:
using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
namespace Parallelism
{
class Program
{
private static Queue<string> _queue = new Queue<string>();
private static Task _lastProcessTask;
static async Task Main(string[] args)
{
string path = "???";
await ReadAndProcessAsync(path);
}
private static async Task ReadAndProcessAsync(string path)
{
using (var str = File.OpenRead(path))
using (var sr = new StreamReader(str))
{
string line = null;
while (true)
{
line = await sr.ReadLineAsync();
if (line == null)
break;
lock (_queue)
{
_queue.Enqueue(line);
if (_queue.Count == 1)
// There was nothing in the queue before
// so initiate a new processing loop. Save
// but DON'T await the Task yet.
_lastProcessTask = ProcessQueueAsync();
}
}
}
// Now that file reading is completed, await
// _lastProcessTask to ensure we don't return
// before it's finished.
await _lastProcessTask;
}
// This will continue processing as long as lines are in the queue,
// including new lines entering the queue while processing earlier ones.
private static Task ProcessQueueAsync()
{
return Task.Run(async () =>
{
while (true)
{
string line;
lock (_queue)
{
// Only peak at first so the read loop doesn't think
// the queue is empty and initiate a second processing
// loop while we're processing this line.
if (!_queue.TryPeek(out line))
return;
}
await ProcessLineAsync(line);
lock (_queue)
{
// Dequeues the item we just processed. If it's the last
// one, this loop is done.
_queue.Dequeue();
if (_queue.Count == 0)
return;
}
}
});
}
private static async Task ProcessLineAsync(string line)
{
// do something
}
}
}
请注意,这种方法有一个处理循环,当队列中没有任何内容时终止,并在新项目准备就绪时重新启动。另一种方法是有一个连续的处理循环,在队列为空时重复重新检查并执行Task.Delay()
一小段时间。我更喜欢我的方法,因为它不会因定期和不必要的检查而使工作线程陷入困境,但性能可能会有不明显的不同。
也只是为了评论 Blindy 的回答,我不同意在这里不鼓励使用并行性。首先,如今大多数 CPU 都是多核的,因此巧妙地使用 .NET 线程池实际上会在多核 CPU 上运行时最大限度地提高应用程序的效率,并且在单核场景中的缺点非常小。
不过,更重要的是,异步不等于多线程。异步编程早在多线程之前就已经存在,I/O 是最显着的例子。I/O 操作在很大程度上由CPU以外的硬件处理 - NIC、SATA 控制器等。它们使用一个古老的概念,称为硬件中断,今天大多数编码人员可能从未听说过,并且比多线程早了几十年。它基本上只是一种在非 CPU 操作完成时给 CPU 一个回调以执行的方法。因此,当您使用表现良好的异步 API 时(尽管 .NETFileStream
存在 Theodore 提到的问题),您的 CPU 根本不应该做那么多工作。当你await
对于这样的 API,CPU 基本上处于空闲状态,直到机器中的其他硬件将请求的数据写入 RAM。
我同意 Blindy 的观点,如果计算机科学程序能更好地教人们计算机硬件是如何工作的,那就更好了。用柯克船长的话来说,利用 CPU 在等待从磁盘读取数据、从网络读取数据等时可以做其他事情的事实,就是“军官思维”。
推荐阅读
- typescript - 如何使用“typeof”从泛型类中获取类型?
- go - 生成对 dkim 验证有效的 rsa 密钥
- polymer-2.x - 组件未在网页上显示
- angular - 无法确定“@angular/cli”本地安装的版本已损坏
- html - 如何在特定标题下的 rmarkdown html 中放置脚注?
- java - 如何使 DatePickerDialog 的 setMinDate 与“setCurrentDate”不同?
- markdown - Hugo 页面上的图像路径错误
- otrs - 如何使用 OTRS REST 接口生成 OTRS 票证
- ansible - 为什么 Ansible 无法解析我的 azure 动态库存配置文件?
- javascript - Angular 7:如何从测试用例仅访问/发送 HTTP 响应的正文部分