c# - 多线程文本处理
问题描述
我写了一段处理文件的代码。主要目标是替换 .log 文件中的所有匹配项(如ABC: 123,其中123可以是任何匹配项)。因此,我在块上分隔文件以防止 OutOfMemory 异常。此外,我使用 TPL 来提高应用程序的性能,但我猜代码包含一些不足。
有人可以查看代码或查看代码并提出一些建议吗?此外,代码可在github 上找到
private static void ProcessFiles()
{
var tasks = new BlockingCollection<Task>();
Parallel.ForEach(FilePaths, path =>
{
tasks.Add(Task.Run(() =>
{
ProcessFile(path);
}));
});
Task.WaitAll(tasks.ToArray());
}
private static void ProcessFile(string path)
{
if (!File.Exists(path)) return;
try
{
string text;
using (var fs = File.Open(path, FileMode.Open, FileAccess.Read))
using (var bs = new BufferedStream(fs))
using (var sr = new StreamReader(bs))
{
text = sr.ReadToEnd();
}
const int chunkSize = 10 * 1024;
var limit = (text.Length + chunkSize - 1) / chunkSize;
var chuncks = Enumerable.Range(0, limit).Select(i =>
{
var startIndex = i * chunkSize;
var length = text.Length - startIndex >= chunkSize ? chunkSize : text.Length - startIndex;
return text.Substring(startIndex, length);
}).ToList();
Parallel.ForEach(chuncks, (row, _, index) =>
{
var i = Convert.ToInt32(index);
chuncks[i] = ProcessText(row);
});
SaveProcessedFile(path, chuncks);
}
catch (Exception ex)
{
Logger.Error(ex, ex.Message);
}
}
private static string ProcessText(string oldText)
{
var processedText = Pattern.Replace(oldText, Replacement);
return processedText;
}
private static void SaveProcessedFile(string path, List<string> text)
{
using (var fs = File.Open(path, FileMode.Create))
using(var wr = new StreamWriter(fs, Encoding.Default))
{
foreach (var chunk in text)
{
wr.Write(chunk);
}
}
}
解决方案
您需要调用bc.CompleteAdding()
以指示阻塞集合已完成添加,并且不应阻塞从bc.GetConsumingEnumerable()
这是调用的样子:
using (var bc = new BlockingCollection<string>(new ConcurrentQueue<string>(chuncks)))
{
//Signal complete adding
bc.CompleteAdding();
var collection = bc.GetConsumingEnumerable();
Parallel.ForEach(collection, (row, _, index) =>
{
ProcessText(row);
});
return String.Join(String.Empty, collection);
}
推荐阅读
- python - eqNullSafe 函数在 spark 2.4.1 中引发错误
- python - ValueError:标签数=25 与样本数=56 不匹配
- git - GIT_SSH_COMMAND 环境变量作为 git 配置
- python - 用个人度量python构建热图矩阵
- r - 比较同一 R 数据框中的行
- mysql - 更新期间带有 Prepared Statement 的 PDO
- nginx - nginx位置块中的正则表达式
- java - Kubernetes Java 客户端 API 异常
- javascript - JavaScript 下拉菜单功能的问题
- c# - C# 中用于加入 Windows 域的 Powershell 脚本