c# - 如何为同时运行的多个 ActionBlock 设置 CPU 优先级?
问题描述
我有一堆ActionBlocks
,每个都在做不同的事情。
- 大的处理数据,并由一个不断地馈送数据
TransformBlock
。 - 其他 3 个
ActionBlocks
简单地在 3 个文本文件(日志)中写入行。
它有点工作,除了 3 日志记录ActionBlocks
仅在处理完成时开始消耗数据ActionBlock
(因此他们在程序结束时一次性写入所有日志记录信息)。
我想知道我是否可以影响这种行为,从而为日志记录提供更高的优先级ActionBlocks
?
谢谢你的帮助。
代码示例:
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace dataflowtest
{
class Program
{
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
static readonly IReadOnlyCollection<string> charsSets = Enumerable.Repeat(chars, 8).ToList().AsReadOnly();
static readonly Random random = new Random();
static event EventHandler<string> MessageGot;
static async Task Main(string[] args)
{
var source = new TransformBlock<string, string>(GetMessage, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1, EnsureOrdered = false });
var target = new ActionBlock<string>(Console.WriteLine);
var programDir = Path.GetDirectoryName(System.Reflection.Assembly.GetEntryAssembly().GetName().CodeBase.Replace("file:///", ""));
using var file1 = new StreamWriter(Path.Combine(programDir, "file1.txt"));
using var file2 = new StreamWriter(Path.Combine(programDir, "file2.txt"));
using var file3 = new StreamWriter(Path.Combine(programDir, "file3.txt"));
var fileAction1 = new ActionBlock<string>(file1.WriteLineAsync);
var fileAction2 = new ActionBlock<string>(file2.WriteLineAsync);
var fileAction3 = new ActionBlock<string>(file3.WriteLineAsync);
MessageGot += async (_, e) => await fileAction1.SendAsync(e);
MessageGot += async (_, e) => await fileAction2.SendAsync(e);
MessageGot += async (_, e) => await fileAction3.SendAsync(e);
using (source.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 100; i++)
{
await source.SendAsync(i.ToString() + '\t' + new string(charsSets.Select(s => s[random.Next(s.Length)]).ToArray()));
}
source.Complete();
await target.Completion;
}
}
private static async Task<string> GetMessage(string input)
{
int delay = random.Next(25, 6000);
await Task.Delay(delay);
string message = input.ToLowerInvariant() + '\t' + delay.ToString();
MessageGot?.Invoke(null, message);
return message;
}
}
}
解决方案
默认情况下,aStreamWriter
将每 4,096 字节刷新一次其缓冲区。您可能希望它在写入的每一行都刷新。所以代替这个:
var fileAction1 = new ActionBlock<string>(file1.WriteLineAsync);
...做这个:
var fileAction1 = new ActionBlock<string>(item =>
{
file1.WriteLine(item);
file1.Flush();
});
使用WriteLineAsync
而不是没有好处,因为未使用该选项打开WriteLine
底层证券。FileStream
FileOptions.Asynchronous
推荐阅读
- javascript - 让 React hook 步入循环
- r - 如何删除R中所有NULL值的行
- ansible - Ansible 未正确设置环境变量
- javascript - Pixi.js 创建围绕点旋转的可拖动和可点击手柄的最佳方式
- php - 链式下拉菜单中的默认值
- c++ - 从 2 个 std::maps 获取公共密钥的更好方法
- amazon-web-services - 无法使用 AWS CLI 为通过 Amazon SNS 的 SMS 设置发件人 ID
- mysql - 如何选择派生表中具有最大值的所有行?
- rust - Serde tag = x,但将标签保留在结构中
- git - 如何防止 git 使用 git add 暂存已删除的文件?