c# - 在 C# Parallel.ForEach 中的线程上保持变量
问题描述
我希望并行处理依赖于对象 ( State
)的任务,该对象不是线程安全的,并且其构造耗时。
出于这个原因,我正在研究partition-local variables,但要么我做错了,要么正在寻找其他东西。这或多或少代表了我当前的实现:
Parallel.ForEach<string, State>(folders, config, () => new State(), (source, loopState, index, threadState) => {
var content = File.ReadAllText(source); // read file
var result = threadState.doSomething(content); // do something
File.WriteAllText(outputFile, result); // write output
return threadState;
}, (threadState) => { });
但是,我Console.WriteLine
在State
初始化程序中添加了一个,并且我看到对于循环的每次迭代,State
都会调用构造函数,从而对性能造成很大影响。我希望将一个线程中的实例传递State
给同一线程上的后续迭代。
我怎样才能做到这一点?
解决方案
你有几个选择。最简单的方法是创建一个State
对象,并通过使用来同步对它的访问lock
:
var state = new State();
Parallel.ForEach(folders, config, source =>
{
var content = File.ReadAllText(source);
string result;
lock (state) { result = state.DoSomething(content); }
File.WriteAllText(outputFile, result);
});
我认为这是不可行的,因为该DoSomething
方法很耗时,并且同步它会破坏并行性。
另一种选择是使用ThreadLocal<State>
. 此类提供数据的线程本地存储,因此State
创建的对象数将等于Parallel.ForEach
.
var threadLocalState = new ThreadLocal<State>(() => new State());
Parallel.ForEach(folders, config, source =>
{
var content = File.ReadAllText(source);
var result = threadLocalState.Value.DoSomething(content);
File.WriteAllText(outputFile, result);
});
这可能会创建State
比重载更少的对象Parallel.ForEach<TSource, TLocal>
,但仍不等于配置的MaxDegreeOfParallelism
. 的Parallel.ForEach
使用线程ThreadPool
,并且很可能在计算过程中使用所有线程,前提是列表folders
足够长。而且您几乎无法控制ThreadPool
. 所以这也不是一个特别诱人的解决方案。
我能想到的第三个也是最后一个选项是创建一个State
对象池,并且Rent
/Return
在每个循环中创建一个:
var statePool = new ObjectPool<State>(() => new State());
Parallel.ForEach(folders, config, source =>
{
var state = statePool.Rent();
var content = File.ReadAllText(source);
var result = state.DoSomething(content);
File.WriteAllText(outputFile, result);
statePool.Return(state);
});
这样实例化State
对象的数量将等于最大并行度。
唯一的问题是.NET 平台中没有ObjectPool
类(只有一个ArrayPool
类),所以您必须找到一个。这是一个基于 a 的简单实现ConcurrentBag
:
public class ObjectPool<T> : IEnumerable<T> where T : new()
{
private readonly ConcurrentBag<T> _bag = new ConcurrentBag<T>();
private readonly Func<T> _factory;
public ObjectPool(Func<T> factory = null) => _factory = factory;
public T Rent()
{
if (_bag.TryTake(out var obj)) return obj;
return _factory != null ? _factory() : new T();
}
public void Return(T obj) => _bag.Add(obj);
public IEnumerator<T> GetEnumerator() => _bag.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator();
}
推荐阅读
- c# - 以太网连接 - 我如何知道另一端已重置并且我需要关闭?
- shopify - 如何将 shopify 应用内容呈现到 shopify 商店?
- python - sqlalchemy 不工作????..这里有什么问题?
- crud - NestJS-CRUD:如何为列名设置别名?
- javascript - 在 Javascript 中交换图像
- sql - SQL - 检查字段值(金钱)是否是客户相等的值的 99%
- r - 根据日期和邮政编码合并两个文件,并计算 R 中空气污染数据的日加权平均值
- android - 构建 React Native Android 应用程序,但调试应用程序仍在尝试查找旧文件
- security - 如何使用 sqlmap 获取 SQL 注入攻击数据?
- python - 使用正则表达式从 pandas df 的列中提取单词