首页 > 解决方案 > 在 BlockingCollection 上使用新线程而不是 Task.Run

问题描述

我们的代码在返回时有时会出现问题

我有一个生产者/消费者应用程序,它启动多个消费者等待 BlockingCollection(命名队列)。

  for (var i = 0; i < 10; i++)
    {

    var t = Task.Factory.StartNew(async () =>
     {
     try
      {
           await InboundQueue(stoppingToken.Value, queueProcessorId).ConfigureAwait(false);
                               
      }
      catch (OperationCanceledException ocex)
      {
                                
      }
      catch (Exception ex)
      {
                                
      }
      }, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach);

在处理 processMessage 之前,每个消费者都在 TryTake 方法中被阻止。

if (!_queue.TryTake(out var message, timeToWait, cancellationToken))
{
..
await processMessage((T)message).ConfigureAwait(false);
..
}

在 processMessage 中,我们多次调用数据库执行 async/await,使用 async/await 调用 httpclient,甚至在某些情况下执行 Task.Run 来执行操作而不等待完成。

await 需要一些时间才能完成。我们在 processMessage 中添加了一些额外的 ConfigureAwait(false) 代码,它在某些情况下有所帮助。仍然缓慢等待发生。

https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#avoid-using-taskrun-for-long-running-work-that-blocks-the-thread 阅读这篇文章,我们将更改要使用的代码“新线程”而不是“Task.Factory.StartNew”。

1.因为我们将为运行消费者创建一个标准线程,所以我们需要在 processMesssage 方法中额外的 ConfigureAwait(false) 调用异步 httpclient 或数据库吗?

2.processMessage 有自己的线程,可以轻松同步。我们可以在请求异步调用的库上安全地执行 .Result 吗?

3.如何调试这种情况?注意:通过跟踪网络我们发现 https 调用和数据库调用总是很快的。

提前致谢。

标签: .netmultithreadingasync-await

解决方案


There's a problem in the original code: StartNew doesn't understand async code. Specifically, the LongRunning flag is pretty much meaningless there, because it only applies to the part of the code until it hits the first await.

I have a producer/consumer application which start multiple consumers waiting on BlockingCollection (named queue).

Every consumer is blocked in TryTake method before processing processMessage.

So you end up blocking a bunch of thread pool threads, and I suspect that is what is actually causing your problem (especially if this pattern is repeated elsewhere).

It happens quite offen that await is taking some time to finish.

This sound like thread pool exhaustion, which can happen if thread pool threads are used to block on a regular basis.

We added in some code inside processMessage additional ConfigureAwait(false) and it helped in some cases.

Doubtful. On thread pool threads, ConfigureAwait(false) has no effect.

https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#avoid-using-taskrun-for-long-running-work-that-blocks-the-thread Reading this post we will change code to use "new Thread" istead of "Task.Factory.StartNew".

Well, it's important to understand the context, and why the original code is bad. (Side note: David points out "Don't use TaskCreationOptions.LongRunning with async code as this will create a new thread which will be destroyed after first await", which is the reason for my first paragraph above).

The problem is that thread pool threads are being blocked on a regular basis. If it's just a couple threads at startup, I personally would think that's OK and just allow the thread pool to inject replacements. But if the code is regularly blocking thread pool threads, that's a problem and can cause thread pool exhaustion.

Now, regarding solutions, one solution is indeed to use manual threads, but that's not the solution I would recommend. IMO a far better solution is to remove the blocking.

Specifically, replace the BlockingCollection<T> queue with something that's async-friendly like System.Threading.Channels. Or, if you are interested in a somewhat larger refactor, look into TPL Dataflow.

Removing the blocking is a far better solution than adding manual threads IMO. That said, to answer your specific questions:

Because we will have a standard thread created for running consumers do we need inside processMesssage method additional ConfigureAwait(false) calling async httpclient or database?

For a manual thread - just like a thread pool thread - ConfigureAwait(false) is meaningless.

processMessage has will have own thread and can be easily sync.

It would have to be synchronous, actually. Custom threads can't be asynchronous (the thread exits at the first await). Unless you install a single-threaded context, but then ConfigureAwait(false) does have meaning. And that gets pretty complex.

Can we do safely .Result on libs that request async calling?

That depends on the library. But they would probably be fine. Note: GetAwaiter().GetResult() is like Result except it doesn't wrap exceptions in an AggregateException.

How can we debug this situation?

That's a good question. This article may help; essentially, capture a performance analysis and look for regular thread injection events. You can then find a commonly blocking API by taking a snapshot and examining the thread stacks.


推荐阅读