首页 > 解决方案 > 多线程文本处理

问题描述

我写了一段处理文件的代码。主要目标是替换 .log 文件中的所有匹配项(如ABC: 123,其中123可以是任何匹配项)。因此,我在块上分隔文件以防止 OutOfMemory 异常。此外,我使用 TPL 来提高应用程序的性能,但我猜代码包含一些不足。

有人可以查看代码或查看代码并提出一些建议吗?此外,代码可在github 上找到

    private static void ProcessFiles()
    {
        var tasks = new BlockingCollection<Task>();

        Parallel.ForEach(FilePaths, path =>
        {
            tasks.Add(Task.Run(() =>
            {
                ProcessFile(path);
            }));
        });

        Task.WaitAll(tasks.ToArray());
    }

    private static void ProcessFile(string path)
    {
        if (!File.Exists(path)) return;

        try
        {
            string text;
            using (var fs = File.Open(path, FileMode.Open, FileAccess.Read))
            using (var bs = new BufferedStream(fs))
            using (var sr = new StreamReader(bs))
            {
                text = sr.ReadToEnd();
            }

            const int chunkSize = 10 * 1024;
            var limit = (text.Length + chunkSize - 1) / chunkSize;
            var chuncks = Enumerable.Range(0, limit).Select(i =>
            {
                var startIndex = i * chunkSize;
                var length = text.Length - startIndex >= chunkSize ? chunkSize : text.Length - startIndex;

                return text.Substring(startIndex, length);
            }).ToList();

            Parallel.ForEach(chuncks, (row, _, index) =>
            {
                var i = Convert.ToInt32(index);
                chuncks[i] = ProcessText(row);
            });
            SaveProcessedFile(path, chuncks);
        }
        catch (Exception ex)
        {
            Logger.Error(ex, ex.Message);
        }
    }

    private static string ProcessText(string oldText)
    {
        var processedText = Pattern.Replace(oldText, Replacement);

        return processedText;
    }

    private static void SaveProcessedFile(string path, List<string> text)
    {
        using (var fs = File.Open(path, FileMode.Create))
        using(var wr = new StreamWriter(fs, Encoding.Default))
        {
            foreach (var chunk in text)
            {
                wr.Write(chunk);
            }
        }
    }

标签: c#.nettask-parallel-library

解决方案


您需要调用bc.CompleteAdding()以指示阻塞集合已完成添加,并且不应阻塞从bc.GetConsumingEnumerable()

这是调用的样子:

            using (var bc = new BlockingCollection<string>(new ConcurrentQueue<string>(chuncks)))
            {
                //Signal complete adding
                bc.CompleteAdding();
                var collection = bc.GetConsumingEnumerable();

                Parallel.ForEach(collection, (row, _, index) =>
                {
                    ProcessText(row);
                });

                return String.Join(String.Empty, collection);
            }

推荐阅读