首页 > 解决方案 > 这个线程代码有解释吗?

问题描述

所以遇到了一些与此非常相似的代码。我只是想知道是否有人可以向我解释这一点。

看看它如何使用 RX 调度程序,然后是 Parallel.For 和里面一个新的 TaskFactory.StartNew

    IDisposable subscription = someObservable.ObserveOn(ThreadPoolScheduler.Instance)
    .Subscribe(o =>
       {
           Parallel.ForEach(xxxs,
               x =>
               {
                   var theKey = x.Key;
                   if (!theTasks.ContainsKey(theKey) ||
                       theTasks.ContainsKey(theKey) && theTasks[theKey].IsCompleted)
                   {
                       theTasks[theKey] = Task.Factory.StartNew(
                           () =>
                           {
                               .....
                               }
                               catch (CommunicationObjectAbortedException ex)
                               {
                               ....
                               }
                               catch (ObjectDisposedException ex)
                               {
                               ....
                               }
                               catch (Exception e)
                               {
                               ....
                               }
                           });
                   }
               });
       },
       ex =>
       {
          ....
       },
       () =>
       {
          ....
       });
    } 

我知道所有这些事情单独做了什么,但我不确定这里的组合线程效果是什么。任何人都可以猜测

标签: asynchronoustask-parallel-librarysystem.reactive

解决方案


啊,是的,并发Turducken

ThreadPoolScheduler调度与任务池不同的线程池上的工作。用于在没有任务池的平台上使用 -尽可能选择。ThreadPoolSchedulerTaskPoolScheduler

感觉就像作者试图通过使用线程池来仅为手头的任务(请原谅双关语)保存任务池。

Parallel.ForEach阻塞直到循环完成。因此,当它在线程池上运行时,当发出一个新项目时,在ForEach从线程池借来的线程上执行下一个。

至于内部位,作者希望Task每个唯一键运行一个,如果尚未运行。


推荐阅读