首页 > 解决方案 > TPL-Dataflow 是否适用于高并发应用程序?

问题描述

我正在调查 TPL-Dataflow 是否能够让我们免于为我们的高并发应用程序编写带有锁和监视器的样板代码。

因此,我正在模拟一个具有单个生产者和多个消费者的简单场景,每个消费者都有望获得所有生产的消息。如果一些消费者比其他消费者慢,它不应该导致系统停滞。

这是代码:

using NLog;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp10
{
    internal sealed class Program
    {
        private static readonly Logger m_logger = LogManager.GetCurrentClassLogger();

        static void Main(string[] args)
        {
            BroadcastBlock<int> root = new BroadcastBlock<int>(d => d);

            ExecutionDataflowBlockOptions consumerOptions = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = 3
            };

            for (int consumerIndex = 0; consumerIndex < 5; ++consumerIndex)
            {
                int c = consumerIndex;
                ActionBlock<int> consumer = new ActionBlock<int>(
                    (int d) => {
                        m_logger.Trace($"[#{c}] Starting consuming {d}");
                        Thread.Sleep(c * 100);
                        m_logger.Trace($"[#{c}] Ended consuming {d}");
                    },
                    consumerOptions
                );
                root.LinkTo(consumer);
            }

            Producer(10, root);

            Console.ReadLine();
        }


        private static void Producer(int n, ITargetBlock<int> target)
        {
            for (int i = 0; i < n; ++i)
            {
                m_logger.Trace($"Starting producing {i}");
                if (!target.Post(i))
                {
                    throw new Exception($"Failed to post message #{i}");
                }
                m_logger.Trace($"Ending producing {i}");
                Thread.Sleep(50);
            }
        }
    }
}

如您所见,我将消费者缓冲区的大小限制为 3(以防止缓慢的消费者缓冲区无限增长)。

每个下一个消费者都比上一个消费者慢。消费者#0 是最快的,没有延迟。生产者在生产上有一些小的延迟。

我希望至少消费者 #0 会消耗所有消息,而消费者 #4 不会收到一些消息,因为它的缓冲区会溢出。

结果如下:

2021-04-15 22:44:15.4905 [T1] Starting producing 0 
2021-04-15 22:44:15.5049 [T1] Ending producing 0 
2021-04-15 22:44:15.5166 [T4] [#4] Starting consuming 0 
2021-04-15 22:44:15.5285 [T7] [#0] Starting consuming 0 
2021-04-15 22:44:15.5285 [T7] [#0] Ended consuming 0 
2021-04-15 22:44:15.5285 [T7] [#1] Starting consuming 0 
2021-04-15 22:44:15.5573 [T1] Starting producing 1 
2021-04-15 22:44:15.5573 [T1] Ending producing 1 
2021-04-15 22:44:15.5573 [T5] [#0] Starting consuming 1 
2021-04-15 22:44:15.5573 [T5] [#0] Ended consuming 1 
2021-04-15 22:44:15.5573 [T5] [#2] Starting consuming 0 
2021-04-15 22:44:15.5573 [T6] [#3] Starting consuming 0 
2021-04-15 22:44:15.6081 [T1] Starting producing 2 
2021-04-15 22:44:15.6081 [T1] Ending producing 2 
2021-04-15 22:44:15.6352 [T7] [#1] Ended consuming 0 
2021-04-15 22:44:15.6352 [T7] [#1] Starting consuming 1 
2021-04-15 22:44:15.6592 [T1] Starting producing 3 
2021-04-15 22:44:15.6592 [T1] Ending producing 3 
2021-04-15 22:44:15.7102 [T1] Starting producing 4 
2021-04-15 22:44:15.7102 [T1] Ending producing 4 
2021-04-15 22:44:15.7353 [T7] [#1] Ended consuming 1 
2021-04-15 22:44:15.7353 [T7] [#1] Starting consuming 2 
2021-04-15 22:44:15.7612 [T5] [#2] Ended consuming 0 
2021-04-15 22:44:15.7612 [T5] [#2] Starting consuming 1 
2021-04-15 22:44:15.7612 [T1] Starting producing 5 
2021-04-15 22:44:15.7612 [T1] Ending producing 5 
2021-04-15 22:44:15.8132 [T1] Starting producing 6 
2021-04-15 22:44:15.8132 [T1] Ending producing 6 
2021-04-15 22:44:15.8420 [T7] [#1] Ended consuming 2 
2021-04-15 22:44:15.8420 [T7] [#1] Starting consuming 3 
2021-04-15 22:44:15.8603 [T6] [#3] Ended consuming 0 
2021-04-15 22:44:15.8603 [T6] [#3] Starting consuming 1 
2021-04-15 22:44:15.8764 [T1] Starting producing 7 
2021-04-15 22:44:15.8764 [T1] Ending producing 7 
2021-04-15 22:44:15.9174 [T4] [#4] Ended consuming 0 
2021-04-15 22:44:15.9174 [T4] [#4] Starting consuming 1 
2021-04-15 22:44:15.9369 [T1] Starting producing 8 
2021-04-15 22:44:15.9369 [T1] Ending producing 8 
2021-04-15 22:44:15.9509 [T7] [#1] Ended consuming 3 
2021-04-15 22:44:15.9509 [T7] [#1] Starting consuming 4 
2021-04-15 22:44:15.9639 [T5] [#2] Ended consuming 1 
2021-04-15 22:44:15.9639 [T5] [#2] Starting consuming 2 
2021-04-15 22:44:15.9874 [T1] Starting producing 9 
2021-04-15 22:44:15.9874 [T1] Ending producing 9 
2021-04-15 22:44:16.0515 [T7] [#1] Ended consuming 4 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 2 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 2 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 3 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 3 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 4 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 4 
2021-04-15 22:44:16.0515 [T7] [#1] Starting consuming 5 
2021-04-15 22:44:16.1525 [T7] [#1] Ended consuming 5 
2021-04-15 22:44:16.1525 [T7] [#1] Starting consuming 6 
2021-04-15 22:44:16.1525 [T6] [#3] Ended consuming 1 
2021-04-15 22:44:16.1525 [T6] [#3] Starting consuming 2 
2021-04-15 22:44:16.1645 [T5] [#2] Ended consuming 2 
2021-04-15 22:44:16.1645 [T5] [#2] Starting consuming 4 
2021-04-15 22:44:16.2526 [T7] [#1] Ended consuming 6 
2021-04-15 22:44:16.2526 [T7] [#1] Starting consuming 7 
2021-04-15 22:44:16.3177 [T4] [#4] Ended consuming 1 
2021-04-15 22:44:16.3177 [T4] [#4] Starting consuming 2 
2021-04-15 22:44:16.3537 [T7] [#1] Ended consuming 7 
2021-04-15 22:44:16.3537 [T7] [#1] Starting consuming 9 
2021-04-15 22:44:16.3537 [T5] [#2] Ended consuming 4 
2021-04-15 22:44:16.3537 [T5] [#2] Starting consuming 5 
2021-04-15 22:44:16.4547 [T7] [#1] Ended consuming 9 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 5 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 5 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 6 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 6 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 7 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 7 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 9 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 9 
2021-04-15 22:44:16.4607 [T6] [#3] Ended consuming 2 
2021-04-15 22:44:16.4607 [T6] [#3] Starting consuming 4 
2021-04-15 22:44:16.5648 [T5] [#2] Ended consuming 5 
2021-04-15 22:44:16.5648 [T5] [#2] Starting consuming 9 
2021-04-15 22:44:16.7179 [T4] [#4] Ended consuming 2 
2021-04-15 22:44:16.7179 [T4] [#4] Starting consuming 4 
2021-04-15 22:44:16.7610 [T6] [#3] Ended consuming 4 
2021-04-15 22:44:16.7610 [T6] [#3] Starting consuming 9 
2021-04-15 22:44:16.7610 [T5] [#2] Ended consuming 9 
2021-04-15 22:44:17.0611 [T6] [#3] Ended consuming 9 
2021-04-15 22:44:17.1182 [T4] [#4] Ended consuming 4 
2021-04-15 22:44:17.1182 [T4] [#4] Starting consuming 9 
2021-04-15 22:44:17.5185 [T4] [#4] Ended consuming 9 

令我困惑的是,消费者 #0 永远不会收到 message 8。实际上没有其他消费者得到这个消息。为什么会这样?这是 Dataflow 的预期行为吗?

如果您想检查我的 NLog.config 如下所示(我使用AsyncWrapper目标来防止文件访问影响我的实验结果):

<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.nlog-project.org/schemas/NLog.xsd NLog.xsd"
        internalLogFile="nlog.log"
        internalLogLevel="Warn"
        throwExceptions="false" 
        parseMessageTemplates="false"
    >

  <variable name="varExceptionMsg" value="${exception:format=Message}"/>
  <variable name="varMessageWithException" value="${message}${onexception:inner= ${varExceptionMsg}}"/>
  <variable name="msg4File" value="${longdate} [T${threadid}${threadname}] ${varMessageWithException} ${onexception:inner=${newline}${exception:format=tostring:maxInnerExceptionLevel=2:innerFormat=tostring}}" />

  <targets>

    <target name="file" xsi:type="AsyncWrapper" queueLimit="5000" overflowAction="Discard">
      <target xsi:type="File"
              layout="${msg4File}"
              fileName="${basedir}/logs/${processname}.${shortdate}.log"
              keepFileOpen="true"
              encoding="utf-8"
            />
    </target>
  </targets>

  <rules>
    <logger name="*" minlevel="Trace" writeTo="file" />
  </rules>
</nlog>

标签: c#tpl-dataflow

解决方案


您需要在 root 上执行 .Complete(),然后Main() 需要等待所有消费者在 Main 结束之前完成咀嚼食物。

在 Main 的顶部:

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

在你的 for 循环中:

  • 将所有消费者添加到列表(我称之为“comps”,Completions 的缩写)。
  • 将 linkOptions 作为您的 LinkTo 调用的一部分。

在您的 ReadLine() 之前添加:

root.Complete();
Task.WaitAll(comps.ToArray());

推荐阅读