首页 > 解决方案 > 为什么 for 循环中并发队列上的 .async 与 DispatchQueue.concurrentPerform 的行为不同?

问题描述

import Dispatch

class SynchronizedArray<T> {
    private var array: [T] = []
    private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
    
    var get: [T] {
        accessQueue.sync {
            array
        }
    }
    
    func append(newElement: T) {
        accessQueue.async(flags: .barrier) {
            self.array.append(newElement)
        }
    }
}

如果我运行以下代码,即使我正在同时读取,也会按预期将 10,000 个元素附加到数组中:

DispatchQueue.concurrentPerform(iterations: 10000) { i in
    _ threadSafeArray.get
    threadSafeArray.append(newElement: i)
}

但是当我这样做时,它永远不会接近添加 10,000 个元素(我上次运行它时只在我的计算机上添加了 92 个元素)。

let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
    concurrent.async {
        _ = threadSafeArray.get
        threadSafeArray.append(newElement: i)
    }
}

为什么前者有效,而后者为什么无效?

标签: swiftmultithreadinggrand-central-dispatch

解决方案


很高兴您找到了线程爆炸的解决方案。请参阅线程爆炸 WWDC 2015 Building Responsive and Efficient Apps with GCD 和 WWDC 2016 Concurrent Programming With GCD in Swift 3中的讨论。

话虽如此,现在考虑到(或with its或 Combine with its )DispatchSemaphore的存在,这有点反模式。所有这些都比调度信号量更优雅地管理并发度。concurrentPerformOperationQueuemaxConcurrentOperationCountmaxPublishers

说了这么多,对您的信号量模式的一些观察:

  1. 使用此DispatchSemaphore模式时,通常将 the 放在前面wait concurrent.async { ... }因为,正如所写,您将获得 9 个并发操作,而不是 8 个,这有点误导)。

  2. 这里更深层次的问题是您已经减少了计数问题的问题,但它仍然存在。考虑:

    let threadSafeArray = SynchronizedArray<Int>()
    
    let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
    let semaphore = DispatchSemaphore(value: 8)
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    print(threadSafeArray.get.count)
    

    当您离开for循环时,您仍然可以有多达八个concurrent仍在运行的异步任务,并且count(相对于concurrent队列不同步)仍然可以少于 10,000。您必须添加另一个concurrent.async(flags: .barrier) { ... },这只是添加第二层同步。例如

    let semaphore = DispatchSemaphore(value: 8)
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    concurrent.async(flags: .barrier) {
        print(threadSafeArray.get.count)
    }
    

    或者您可以使用DispatchGroup经典机制来确定一系列异步调度的块何时完成:

    let semaphore = DispatchSemaphore(value: 8)
    let group = DispatchGroup()
    
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async(group: group) {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    group.notify(queue: .main) {
        print(threadSafeArray.get.count)
    }
    

    使用 ofconcurrentPerform消除了对这两种模式中的任何一种的需要,因为在所有并发任务完成之前它不会继续执行。(它还会根据您设备上的内核数量自动优化并发程度。)

  3. FWIW,一个更好的替代方案SynchronizedArray是根本不公开底层数组,只实现你想要公开的任何方法,集成必要的同步。它使呼叫站点更清洁,并解决了许多问题。

    例如,假设您想公开下标运算符和一个count变量,您可以这样做:

    class SynchronizedArray<T> {
        private var array: [T]
        private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)
    
        init(_ array: [T] = []) {
            self.array = array
        }
    
        subscript(index: Int) -> T {
            get { reader { $0[index] } }
            set { writer { $0[index] = newValue } }
        }
    
        var count: Int {
            reader { $0.count }
        }
    
        func append(newElement: T) {
            writer { $0.append(newElement) }
        }
    
        func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
            try accessQueue.sync { try block(array) }
        }
    
        func writer(_ block: @escaping (inout [T]) -> Void) {
            accessQueue.async(flags: .barrier) { block(&self.array) }
        }
    }
    

    这解决了各种问题。例如,您现在可以执行以下操作:

    print(threadSafeArray.count) // get the count
    print(threadSafeArray[500])  // get the 500th item
    

    您现在还可以执行以下操作:

    let average = threadSafeArray.reader { array -> Double in
        let sum = array.reduce(0, +)
        return Double(sum) / Double(array.count)
    }
    

    但是,最重要的是,在处理集合(或任何可变对象)时,您总是不想公开可变对象本身,而是为常见操作(下标、 、 等)编写自己的同步方法countremoveAll并且可能还为应用程序开发人员可能需要更广泛的同步机制的情况公开读取器/写入器接口。

    (FWIW,对此的更改SynchronizedArray适用于信号量或concurrentPerform场景;只是信号量恰好在这种情况下显示了问题。)

  4. 不用说,您通常也会在每个线程上完成更多工作,因为与上下文切换开销一样小,这里可能足以抵消从并行处理中获得的任何优势。(但我知道这可能只是一个问题的概念演示,而不是提议的实现。)对未来的读者来说只是一个仅供参考。


推荐阅读