首页 > 解决方案 > 在特定线程上运行异步工作,然后继续

问题描述

我需要启动一些任务来运行,但我希望它们在每次运行时都在特定(相同)线程上。我不知道如何实现这一点,除非实例化一个SingleThreadTaskScheduler(我自己创造的)。我从捕获源获取帧,我想将处理工作拆分到并行线程上,但我需要并行线程按顺序对帧进行操作。要做到这一点,它们必须与我上次输入的线程相同,每个处理管道。例如,我有并行处理管道 A、B 和 C。每次获得帧时,我都需要输入 AB&C。它们并行运行。

我在 StackOverflow 上看到了另一个关于如何创建单线程任务调度程序的示例,但它没有解释如何允许我等待结果并继续在当前线程中运行。

这是我需要执行的功能。Task.Run()需要通过在特定线程上触发来替换x.LongRunningAsync(),而不仅仅是线程池中的一些随机线程!也就是说,每个项目都有一个特定的线程this.Steps。每次调用都需要调用同一个线程DoParallelStuffDoParallelStuff被多次调用。这个函数的调用者想要在这些事情并行执行时离开并做其他事情。

public async Task<bool> DoParallelStuff()
{
    var tasks = this.Steps.Select(x => Task.Run(() => x.LongRunningAsync()));
    var results = await Task.WhenAll(tasks);
    var completed = !results.Any() || results.All(x => x == true);
    this.OnCompleted();
    return completed;
}

标签: c#parallel-processing

解决方案


这个问题可以用定制TaskScheduler来解决,也可以用定制来解决SynchronizationContext。例如,您可以安装 Stephen Cleary 的Nito.AsyncEx.Context包,然后执行以下操作:

var tasks = this.Steps.Select(step => Task.Factory.StartNew(() =>
{
    AsyncContext.Run(async () =>
    {
        await step.LongRunningAsync();
    });
}, default, TaskCreationOptions.LongRunning, TaskScheduler.Default));

将为每个stepin启动一个专用线程this.Steps。很可能这个线程大部分时间都会被阻塞,而在LongRunningAsync内部做异步的东西,但是await点之间的延续将在这个线程上调用。

重要的是方法await内的所有点LongRunningAsync都在捕获SynchronizationContext. 单ConfigureAwait(false)将导致单线程策略失败。

下面是如何使用 aSingleThreadTaskScheduler来代替:

var tasks = this.Steps.Select(step => Task.Factory.StartNew(async () =>
{
    await step.LongRunningAsync();
}, default, TaskCreationOptions.None, new SingleThreadTaskScheduler()).Unwrap());

注意Unwrap通话,这很重要。

前面的注释,关于点的必要缺席,ConfigureAwait(false)await这里也适用。

SingleThreadTaskScheduler可以在此处找到该类的(未经测试的)实现。


推荐阅读