swift - 为什么 for 循环中并发队列上的 .async 与 DispatchQueue.concurrentPerform 的行为不同?
问题描述
import Dispatch
class SynchronizedArray<T> {
private var array: [T] = []
private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
var get: [T] {
accessQueue.sync {
array
}
}
func append(newElement: T) {
accessQueue.async(flags: .barrier) {
self.array.append(newElement)
}
}
}
如果我运行以下代码,即使我正在同时读取,也会按预期将 10,000 个元素附加到数组中:
DispatchQueue.concurrentPerform(iterations: 10000) { i in
_ threadSafeArray.get
threadSafeArray.append(newElement: i)
}
但是当我这样做时,它永远不会接近添加 10,000 个元素(我上次运行它时只在我的计算机上添加了 92 个元素)。
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
}
}
为什么前者有效,而后者为什么无效?
解决方案
很高兴您找到了线程爆炸的解决方案。请参阅线程爆炸 WWDC 2015 Building Responsive and Efficient Apps with GCD 和 WWDC 2016 Concurrent Programming With GCD in Swift 3中的讨论。
话虽如此,现在考虑到(或with its或 Combine with its )DispatchSemaphore
的存在,这有点反模式。所有这些都比调度信号量更优雅地管理并发度。concurrentPerform
OperationQueue
maxConcurrentOperationCount
maxPublishers
说了这么多,对您的信号量模式的一些观察:
使用此
DispatchSemaphore
模式时,通常将 the 放在前面wait
(concurrent.async { ... }
因为,正如所写,您将获得 9 个并发操作,而不是 8 个,这有点误导)。这里更深层次的问题是您已经减少了计数问题的问题,但它仍然存在。考虑:
let threadSafeArray = SynchronizedArray<Int>() let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent) let semaphore = DispatchSemaphore(value: 8) for i in 0..<10000 { semaphore.wait() concurrent.async { threadSafeArray.append(newElement: i) semaphore.signal() } } print(threadSafeArray.get.count)
当您离开
for
循环时,您仍然可以有多达八个concurrent
仍在运行的异步任务,并且count
(相对于concurrent
队列不同步)仍然可以少于 10,000。您必须添加另一个concurrent.async(flags: .barrier) { ... }
,这只是添加第二层同步。例如let semaphore = DispatchSemaphore(value: 8) for i in 0..<10000 { semaphore.wait() concurrent.async { threadSafeArray.append(newElement: i) semaphore.signal() } } concurrent.async(flags: .barrier) { print(threadSafeArray.get.count) }
或者您可以使用
DispatchGroup
经典机制来确定一系列异步调度的块何时完成:let semaphore = DispatchSemaphore(value: 8) let group = DispatchGroup() for i in 0..<10000 { semaphore.wait() concurrent.async(group: group) { threadSafeArray.append(newElement: i) semaphore.signal() } } group.notify(queue: .main) { print(threadSafeArray.get.count) }
使用 of
concurrentPerform
消除了对这两种模式中的任何一种的需要,因为在所有并发任务完成之前它不会继续执行。(它还会根据您设备上的内核数量自动优化并发程度。)FWIW,一个更好的替代方案
SynchronizedArray
是根本不公开底层数组,只实现你想要公开的任何方法,集成必要的同步。它使呼叫站点更清洁,并解决了许多问题。例如,假设您想公开下标运算符和一个
count
变量,您可以这样做:class SynchronizedArray<T> { private var array: [T] private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent) init(_ array: [T] = []) { self.array = array } subscript(index: Int) -> T { get { reader { $0[index] } } set { writer { $0[index] = newValue } } } var count: Int { reader { $0.count } } func append(newElement: T) { writer { $0.append(newElement) } } func reader<U>(_ block: ([T]) throws -> U) rethrows -> U { try accessQueue.sync { try block(array) } } func writer(_ block: @escaping (inout [T]) -> Void) { accessQueue.async(flags: .barrier) { block(&self.array) } } }
这解决了各种问题。例如,您现在可以执行以下操作:
print(threadSafeArray.count) // get the count print(threadSafeArray[500]) // get the 500th item
您现在还可以执行以下操作:
let average = threadSafeArray.reader { array -> Double in let sum = array.reduce(0, +) return Double(sum) / Double(array.count) }
但是,最重要的是,在处理集合(或任何可变对象)时,您总是不想公开可变对象本身,而是为常见操作(下标、 、 等)编写自己的同步方法
count
,removeAll
并且可能还为应用程序开发人员可能需要更广泛的同步机制的情况公开读取器/写入器接口。(FWIW,对此的更改
SynchronizedArray
适用于信号量或concurrentPerform
场景;只是信号量恰好在这种情况下显示了问题。)不用说,您通常也会在每个线程上完成更多工作,因为与上下文切换开销一样小,这里可能足以抵消从并行处理中获得的任何优势。(但我知道这可能只是一个问题的概念演示,而不是提议的实现。)对未来的读者来说只是一个仅供参考。
推荐阅读
- scala - Scala - 如何删除具有 Null 和空白值的 Scala 映射的键
- xaml - 当所有孩子都失去焦点时的WPF容器事件
- python - 如何旋转 Python 列表以使特定值居中
- python - 如何汇总列表中单个项目的总数?
- android - 一个活动中的两个布局可见性问题
- javascript - 显示键盘 android (cordova)
- r - 更改向量中特定值的位置
- elasticsearch - Logstash 索引未反映在 Kibana 中
- uwp - 安装 UWP 应用返回未经授权的错误
- google-apps-script - google apps 脚本创建的 html 发布在 Google Docs 中,从今天开始出现错误