c# - 我需要在 StreamReader ReadLine() 上迭代并行 for 循环,但卡在对对象的多线程访问中
问题描述
sr
我尝试在并行 for 循环中创建流读取器对象 ( )。它正在工作,但需要 1.3 分钟才能获取本应在 4 秒内获取的数据。我希望这个问题与这个StreamReader
对象有关。当尝试使用下面的代码时,我遇到了一个错误,我尝试了很多方法来解决,但目前非常卡在它上面。甚至使用并发包volatile
,ThreadStatic
和lock
。
static void Main(string[] args)
{
Task.Run(() =>
{
Thread th0 = new Thread(() => ReadAllLinesAsync(
@"C:\Users\Administrator\Desktop\Fnale mail\LineDataBackHigh.csv"));
th0.Start();
th0.Join();
watch.Stop();
Debug.Log("time=" + watch.Elapsed);
Debug.Log("Finished Task + ");
});
Debug.Log("Free Executed, Task Independent");
}
public static string[] ReadAllLinesAsync(string path)
{
ConcurrentBag<string> lines = new ConcurrentBag<string>();
// Open the FileStream with the same FileMode, FileAccess
// and FileShare as a call to File.OpenText would've done.
using (StreamReader sr = File.OpenText(path))
{
string line = String.Empty;
int k = 0;
sr8 = sr;
Thread th0 = new Thread(Fetch);
th0.Start();
th0.Join();
Debug.Log("Finished Reading2" + lines.Count);
int item = 1;
void Fetch()
{
Parallel.For(k, File.ReadLines(path).Count(), z =>
{
sr8 = sr;
Debug.Log("Executing");
lines.Add(sr8.ReadLine());
// sr.Dispose();
});
}
}
return lines.ToArray();
}
错误:
解决方案
实际上,有一个很好的解决方案可以将此构建到使用消费者模式的 .net 框架中
首先,你创建一个像这样的生产者
static void Produce(ITargetBlock<string> target, Stream stream)
{
using var reader = new StreamReader(stream);
string? line = null;
while ((line = reader.ReadLine()) is not null)
{
target.Post(line);//tells the comsumet there is something to read
}
target.Complete();//tells consumer done filling
}
当“写入数据”完成后,您可以生成将用于读取缓冲区的任务,所以让我们创建消费者
static async Task ConsumeAsync(ISourceBlock<string> source)
{
//reads the data in a non-blocking way
while (await source.OutputAvailableAsync())
{
string line = await source.ReceiveAsync();
//do your magic here
}
}
现在让我们连接这两个方法并执行消费者-生产者模式
using var stream = Assembly.GetExecutingAssembly().GetManifestResourceStream("MyApp.Resources.LargeFile.csv");
if (stream is null)
throw new ResourceNotFoundException("LargeFile.csv", Assembly.GetExecutingAssembly());
var buffer = new BufferBlock<string>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer, stream);
await consumerTask;
想象一下,您需要处理一个大于可用内存的文件...这将根据您读取流的速度与您处理读取字符串的速度来填充...您可能会用完内存..不如阅读快全部并进行并行处理,但您最终会/可能会遇到问题。
至于时间,我在 6 秒内使用此模式将 CSV 文件解析为 .Net 类,仅通过改进锁定,使用该模式需要几分钟。锁定开销越多,性能就越好。
推荐阅读
- android - 在模块之间共享 Android 仪器测试
- vba - 为同一行中的特定范围的单元格着色
- sharepoint - Sharepoint 替代想法以获取来自多个用户的报告
- python - 通过引用传递可变对象?
- android - Android Studio,Gradle 构建时基于风味不同的代码
- machine-learning - 使用 CrossValidator 和 ParamGridBuilder 找到最佳管道模型
- css - Bootstrap SASS 自定义按钮未显示在表格或卡片中?
- python - 熊猫:不一致的连接
- java - 使用 Java 在 Raspberry Pi 3 上读取和写入 USB 设备
- python - 如何使用 Multiindex 聚合到 Pandas 中的列表中?