首页 > 解决方案 > 在 Windows 服务中使用 TPL 进行并行处理

问题描述

我有一个 Windows 服务,它使用消息系统来获取消息。我还在 Timer 类的帮助下创建了一个回调机制,它可以帮助我在一段固定时间后检查消息以获取和处理。以前,服务是一一处理消息的。但我希望消息到达后处理机制并行执行。因此,如果第一条消息到达,它应该在一个任务上进行处理,即使在使用回调方法配置的间隔时间之后仍然没有完成第一条消息的处理(回调现在正在工作),应该选择并处理下一条消息一个不同的任务。

下面是我的代码:

Task.Factory.StartNew(() =>
{
    Subsriber<Message> subsriber = new Subsriber<Message>()
    {
       Interval = 1000
    };

    subsriber.Callback(Process, m => m != null);
});

public static void Process(Message message)
{
  if (message != null)
  {
     // Processing logic
  }
 else
 {

 }
}

但是使用任务工厂我无法控制并行任务的数量,所以在我的情况下,我想配置在任务可用性上运行消息的任务数量?


更新:更新了我上面的代码以添加多个任务

下面是代码:

         private static void Main()
        {
            try
            {
                int taskCount = 5;
                Task.Factory.StartNewAsync(() =>
                {
                   Subscriber<Message> consumer = new 
                   Subcriber<Message>()
                   {
                       Interval = 1000
                    };

                   consumer.CallBack(Process, msg => msg!= 
                   null);
                 }, taskCount);
                Console.ReadLine();
              }
             catch (Exception e)
            {
                 Console.WriteLine(e.Message);
            }

            public static void StartNewAsync(this TaskFactory 
            target, Action action, int taskCount)
           {
                 var tasks = new Task[taskCount];
                 for (int i = 0; i < taskCount; i++)
                 {
                      tasks[i] = target.StartNew(action);
                 }
             }

             public static void Process(Message message)
            {
                 if (message != null)
                {

                 }
                else
                { }
             }
        }

标签: c#parallel-processingtasktask-parallel-library

解决方案


我认为您要寻找的内容会产生相当大的样本。我只是想演示您将如何使用ActionBlock<T>. 仍然有很多未知数,所以我将样本作为您可以构建的骨架。在示例ActionBlock中,当从您的消息传递系统接收到所有消息时,将并行处理和处理您的所有消息

public class Processor
{
    private readonly IMessagingSystem _messagingSystem;
    private readonly ActionBlock<Message> _handler;
    private bool _pollForMessages;

    public Processor(IMessagingSystem messagingSystem)
    {
        _messagingSystem = messagingSystem;
        _handler = new ActionBlock<Message>(msg => Process(msg), new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 5 //or any configured value
        });
    }

    public async Task Start()
    {
        _pollForMessages = true;
        while (_pollForMessages)
        {
            var msg = await _messagingSystem.ReceiveMessageAsync();
            await _handler.SendAsync(msg);
        }

    }

    public void Stop()
    {
        _pollForMessages = false;
    }

    private void Process(Message message)
    {
        //handle message
    }
}

更多示例

和想法


推荐阅读