首页 > 解决方案 > 在 C# Parallel.ForEach 中的线程上保持变量

问题描述

我希望并行处理依赖于对象 ( State)的任务,该对象不是线程安全的,并且其构造耗时

出于这个原因,我正在研究partition-local variables,但要么我做错了,要么正在寻找其他东西。这或多或少代表了我当前的实现:

Parallel.ForEach<string, State>(folders, config, () => new State(), (source, loopState, index, threadState) => {
    var content = File.ReadAllText(source);        // read file
    var result = threadState.doSomething(content); // do something
    File.WriteAllText(outputFile, result);         // write output
    return threadState;
}, (threadState) => { });

但是,我Console.WriteLineState初始化程序中添加了一个,并且我看到对于循环的每次迭代,State都会调用构造函数,从而对性能造成很大影响。我希望将一个线程中的实例传递State给同一线程上的后续迭代。

我怎样才能做到这一点?

标签: c#multithreadingparallel-processing

解决方案


你有几个选择。最简单的方法是创建一个State对象,并通过使用来同步对它的访问lock

var state = new State();

Parallel.ForEach(folders, config, source =>
{
    var content = File.ReadAllText(source);
    string result;
    lock (state) { result = state.DoSomething(content); }
    File.WriteAllText(outputFile, result);
});

我认为这是不可行的,因为该DoSomething方法很耗时,并且同步它会破坏并行性。

另一种选择是使用ThreadLocal<State>. 此类提供数据的线程本地存储,因此State创建的对象数将等于Parallel.ForEach.

var threadLocalState = new ThreadLocal<State>(() => new State());

Parallel.ForEach(folders, config, source =>
{
    var content = File.ReadAllText(source);
    var result = threadLocalState.Value.DoSomething(content);
    File.WriteAllText(outputFile, result);
});

这可能会创建State比重载更少的对象Parallel.ForEach<TSource, TLocal>,但仍不等于配置的MaxDegreeOfParallelism. 的Parallel.ForEach使用线程ThreadPool,并且很可能在计算过程中使用所有线程,前提是列表folders足够长。而且您几乎无法控制ThreadPool. 所以这也不是一个特别诱人的解决方案。

我能想到的第三个也是最后一个选项是创建一个State对象池,并且Rent/Return在每个循环中创建一个:

var statePool = new ObjectPool<State>(() => new State());

Parallel.ForEach(folders, config, source =>
{
    var state = statePool.Rent();
    var content = File.ReadAllText(source);
    var result = state.DoSomething(content);
    File.WriteAllText(outputFile, result);
    statePool.Return(state);
});

这样实例化State对象的数量将等于最大并行度。

唯一的问题是.NET 平台中没有ObjectPool类(只有一个ArrayPool类),所以您必须找到一个。这是一个基于 a 的简单实现ConcurrentBag

public class ObjectPool<T> : IEnumerable<T> where T : new()
{
    private readonly ConcurrentBag<T> _bag = new ConcurrentBag<T>();
    private readonly Func<T> _factory;

    public ObjectPool(Func<T> factory = null) => _factory = factory;

    public T Rent()
    {
        if (_bag.TryTake(out var obj)) return obj;
        return _factory != null ? _factory() : new T();
    }

    public void Return(T obj) => _bag.Add(obj);

    public IEnumerator<T> GetEnumerator() => _bag.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator();
}

推荐阅读