c# - 在 Windows 服务中使用 TPL 进行并行处理
问题描述
我有一个 Windows 服务,它使用消息系统来获取消息。我还在 Timer 类的帮助下创建了一个回调机制,它可以帮助我在一段固定时间后检查消息以获取和处理。以前,服务是一一处理消息的。但我希望消息到达后处理机制并行执行。因此,如果第一条消息到达,它应该在一个任务上进行处理,即使在使用回调方法配置的间隔时间之后仍然没有完成第一条消息的处理(回调现在正在工作),应该选择并处理下一条消息一个不同的任务。
下面是我的代码:
Task.Factory.StartNew(() =>
{
Subsriber<Message> subsriber = new Subsriber<Message>()
{
Interval = 1000
};
subsriber.Callback(Process, m => m != null);
});
public static void Process(Message message)
{
if (message != null)
{
// Processing logic
}
else
{
}
}
但是使用任务工厂我无法控制并行任务的数量,所以在我的情况下,我想配置在任务可用性上运行消息的任务数量?
更新:更新了我上面的代码以添加多个任务
下面是代码:
private static void Main()
{
try
{
int taskCount = 5;
Task.Factory.StartNewAsync(() =>
{
Subscriber<Message> consumer = new
Subcriber<Message>()
{
Interval = 1000
};
consumer.CallBack(Process, msg => msg!=
null);
}, taskCount);
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
public static void StartNewAsync(this TaskFactory
target, Action action, int taskCount)
{
var tasks = new Task[taskCount];
for (int i = 0; i < taskCount; i++)
{
tasks[i] = target.StartNew(action);
}
}
public static void Process(Message message)
{
if (message != null)
{
}
else
{ }
}
}
解决方案
我认为您要寻找的内容会产生相当大的样本。我只是想演示您将如何使用ActionBlock<T>
. 仍然有很多未知数,所以我将样本作为您可以构建的骨架。在示例ActionBlock
中,当从您的消息传递系统接收到所有消息时,将并行处理和处理您的所有消息
public class Processor
{
private readonly IMessagingSystem _messagingSystem;
private readonly ActionBlock<Message> _handler;
private bool _pollForMessages;
public Processor(IMessagingSystem messagingSystem)
{
_messagingSystem = messagingSystem;
_handler = new ActionBlock<Message>(msg => Process(msg), new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 5 //or any configured value
});
}
public async Task Start()
{
_pollForMessages = true;
while (_pollForMessages)
{
var msg = await _messagingSystem.ReceiveMessageAsync();
await _handler.SendAsync(msg);
}
}
public void Stop()
{
_pollForMessages = false;
}
private void Process(Message message)
{
//handle message
}
}
推荐阅读
- jenkins - Jenkins 没有在成功的 github webhook 上触发管道构建
- python - 想要拆分字符串
- node.js - Sequelize 使用同步时定义复数表
- python - 代码没有像我预期的那样运行,我想如果输入错误它会说错,如果输入正确它会说是
- c# - MassTransit:具有多个队列的单一直接交换
- javascript - Google Web App 和 Cookie 返回未定义
- excel - 嵌套 COUNTIF 和 ISNUMBER
- python - 将布尔值从侦听器传递到封闭函数
- swift - 如何使用两个枚举变量实现 Identifiable
- javascript - 以文件上传的形式接收进度