首页 > 解决方案 > 我需要在 StreamReader ReadLine() 上迭代并行 for 循环,但卡在对对象的多线程访问中

问题描述

sr我尝试在并行 for 循环中创建流读取器对象 ( )。它正在工作,但需要 1.3 分钟才能获取本应在 4 秒内获取的数据。我希望这个问题与这个StreamReader对象有关。当尝试使用下面的代码时,我遇到了一个错误,我尝试了很多方法来解决,但目前非常卡在它上面。甚至使用并发包volatileThreadStaticlock

static void Main(string[] args)
{
    Task.Run(() =>
    {
        Thread th0 = new Thread(() => ReadAllLinesAsync(
            @"C:\Users\Administrator\Desktop\Fnale mail\LineDataBackHigh.csv"));
        th0.Start();
        th0.Join();
        watch.Stop();
        Debug.Log("time=" + watch.Elapsed);
        Debug.Log("Finished Task + ");
    });

    Debug.Log("Free Executed, Task Independent");
}

public static string[] ReadAllLinesAsync(string path)
{
    ConcurrentBag<string> lines = new ConcurrentBag<string>();

    // Open the FileStream with the same FileMode, FileAccess
    // and FileShare as a call to File.OpenText would've done.
    using (StreamReader sr = File.OpenText(path))
    {
        string line = String.Empty;
        int k = 0;
        sr8 = sr;

        Thread th0 = new Thread(Fetch);

        th0.Start();
        th0.Join();
        Debug.Log("Finished Reading2" + lines.Count);
        int item = 1;

        void Fetch()
        {
            Parallel.For(k, File.ReadLines(path).Count(), z =>
            {
                sr8 = sr;
                Debug.Log("Executing");
                lines.Add(sr8.ReadLine());
                // sr.Dispose();
            });
        }
    }
    return lines.ToArray();
}

错误:

在此处输入图像描述 在此处输入图像描述

标签: c#multithreadingunity3dparallel-processingparallel-for

解决方案


实际上,有一个很好的解决方案可以将此构建到使用消费者模式的 .net 框架中

首先,你创建一个像这样的生产者

static void Produce(ITargetBlock<string> target, Stream stream)
{
    using var reader = new StreamReader(stream);
    string? line = null;
    while ((line = reader.ReadLine()) is not null)
    {
        target.Post(line);//tells the comsumet there is something to read
    }

    target.Complete();//tells consumer done filling
}

当“写入数据”完成后,您可以生成将用于读取缓冲区的任务,所以让我们创建消费者

static async Task ConsumeAsync(ISourceBlock<string> source)
{
    //reads the data in a non-blocking way
    while (await source.OutputAvailableAsync())
    {
        string line = await source.ReceiveAsync();

        //do your magic here                
    }   
            
}

现在让我们连接这两个方法并执行消费者-生产者模式

using var stream = Assembly.GetExecutingAssembly().GetManifestResourceStream("MyApp.Resources.LargeFile.csv");
if (stream is null)
    throw new ResourceNotFoundException("LargeFile.csv", Assembly.GetExecutingAssembly());


var buffer = new BufferBlock<string>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer, stream);
await consumerTask;

想象一下,您需要处理一个大于可用内存的文件...这将根据您读取流的速度与您处理读取字符串的速度来填充...您可能会用完内存..不如阅读快全部并进行并行处理,但您最终会/可能会遇到问题。

至于时间,我在 6 秒内使用此模式将 CSV 文件解析为 .Net 类,仅通过改进锁定,使用该模式需要几分钟。锁定开销越多,性能就越好。


推荐阅读