首页 > 解决方案 > Swift Combine - Publishers.CombineLatest on multiple Threads

问题描述

When using Publishers.CombineLatest with Publishers which run an threads other that Main the .sink of the Publishers.CombineLatest is not always called.

The issue does not appear every time this is why I created unit tests which try the test 100 times in a row. Usually they fail after 4-5 iterations.

import XCTest
import Combine

class CombineLatestTests: XCTestCase {

    override func setUp() {
        continueAfterFailure = false
    }

    func testCombineLatest_receiveOn() {
        for x in 0...1000 {
            print("---------- RUN \(x)")
            let queue1 = DispatchQueue.global(qos: .userInitiated)
            let queue2 = DispatchQueue.global(qos: .background)

            let subj1 = PassthroughSubject<Int, Never>()
            let subj2 = PassthroughSubject<Int, Never>()

            let publ1 = subj1.receive(on: queue1).map { value -> Int in
                print("-- Observer 1: \(value), Thread: \(Thread.current)")
                return value
            }
            let publ2 = subj2.receive(on: queue2).map { value -> Int in
                print("-- Observer 2: \(value), Thread: \(Thread.current)")
                return value
            }

            let exp = expectation(description: "expect values")
            exp.assertForOverFulfill = false
            let canc = Publishers.CombineLatest(publ1, publ2)
                .sink { value1, value2 in
                    print("-- recieved \(value1):\(value2) on \(Thread.current)")
                    if value1 == 10, value2 == 20 {
                        exp.fulfill()
                    }
                }

            subj1.send(5)
            subj2.send(20)
            subj1.send(10)

            wait(for: [exp], timeout: 10)
            canc.cancel()
        }
    }

    func testCombineLatest_currentValue_receiveOn() {
        for x in 0...100 {
            print("---------- RUN \(x)")
            let queue1 = DispatchQueue.global(qos: .userInitiated)
            let queue2 = DispatchQueue.global(qos: .background)

            let subj1 = CurrentValueSubject<Int, Never>(0)
            let subj2 = CurrentValueSubject<Int, Never>(0)

            let publ1 = subj1.receive(on: queue1).map { value -> Int in
                print("-- Observer 1: \(value), Thread: \(Thread.current)")
                return value
            }
            let publ2 = subj2.receive(on: queue2).map { value -> Int in
                print("-- Observer 2: \(value), Thread: \(Thread.current)")
                return value
            }

            let exp = expectation(description: "expect values")
            exp.assertForOverFulfill = false
            let canc = Publishers.CombineLatest(publ1,
                                                publ2)
                .sink { value1, value2 in
                    print("-- recieved \(value1):\(value2) on \(Thread.current)")
                    if value1 == 10, value2 == 20 {
                        exp.fulfill()
                    }
                }

            subj1.send(10)
            subj2.send(20)

            wait(for: [exp], timeout: 3)
            canc.cancel()
        }
    }

    func testCombineLatest_subscribeOn() {
        for x in 0...100 {
            print("---------- RUN \(x)")
            let queue1 = DispatchQueue.global(qos: .userInitiated)
            let queue2 = DispatchQueue.global(qos: .background)

            let subj1 = PassthroughSubject<Int, Never>()
            let subj2 = PassthroughSubject<Int, Never>()

            let publ1 = subj1.map { value -> Int in
                print("-- Observer 1: \(value), Thread: \(Thread.current)")
                return value
            }
            let publ2 = subj2.map { value -> Int in
                print("-- Observer 2: \(value), Thread: \(Thread.current)")
                return value
            }

            let exp = expectation(description: "expect values")
            exp.assertForOverFulfill = false
            let canc = Publishers.CombineLatest(publ1, publ2)
                .sink { value1, value2 in
                    print("-- recieved \(value1):\(value2) on \(Thread.current)")
                    if value1 == 10, value2 == 20 {
                        exp.fulfill()
                    }
                }

            queue1.async {
                subj1.send(5)
                subj1.send(10)
            }

            queue2.async {
                subj2.send(20)
            }

            wait(for: [exp], timeout: 5)
            canc.cancel()
        }
    }

}

Here are the logs of the 3rd test

Test Case '-[xxxx.CombineLatestTests testCombineLatest_currentValue_receiveOn]' started.
---------- RUN 0
-- Observer 2: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- recieved 10:20 on <NSThread: 0x600000439880>{number = 4, name = (null)}
---------- RUN 1
-- Observer 2: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:20 on <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
---------- RUN 2
-- Observer 2: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- recieved 10:20 on <NSThread: 0x600000439880>{number = 4, name = (null)}
---------- RUN 3
-- Observer 2: 0, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:20 on <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
---------- RUN 4
-- Observer 1: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 2: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:0 on <NSThread: 0x600000439880>{number = 4, name = (null)}
CombineLatestTests.swift:93: error: : Asynchronous wait failed: Exceeded timeout of 3 seconds, with unfulfilled expectations: "expect values".
Test Suite 'CombineLatestTests' failed at 2020-03-04 20:37:24.957.
     Executed 3 tests, with 3 failures (0 unexpected) in 18.159 (18.161) seconds

标签: iosswiftcombine

解决方案


试试这个测试

 func testCombineLatest_receiveOn() {
        for x in 0...100 {
            print("---------- RUN \(x)")
            let q1 = DispatchQueue(label: "q1", qos: .background)
            let q2 = DispatchQueue(label: "q2", qos: .utility, attributes: .concurrent)

            let subj1 = PassthroughSubject<Int, Never>()
            let subj2 = PassthroughSubject<Int, Never>()

            let publ1 = subj1
                .map { value -> Int in
                print("-- Observer 1: \(value), Thread: \(Thread.current)")
                return value
            }
            let publ2 = subj2
                .map { value -> Int in
                print("-- Observer 2: \(value), Thread: \(Thread.current)")
                return value
            }

            let exp = expectation(description: "expect values")
            exp.assertForOverFulfill = false
            // you have to use the same serial queue for publishers which you like to combine
            let canc = Publishers.CombineLatest(publ1.receive(on: q1), publ2.receive(on: q1))
                // this just redirect it to different queue which could be even concurrent
                // (it has no effect at all)
                .receive(on: q2)
                .sink { value1, value2 in
                    print("-- recieved \(value1): \(value2) on \(Thread.current)")
                    if value1 == 10, value2 == 20 {
                        exp.fulfill()
                    }
                }

            let q = DispatchQueue.global()
            // the values could be updated concurently
            q.async {
                subj1.send(5)
            }
            q.async {
                subj2.send(20)
            }
            q.async {
                subj1.send(10)
            }


            wait(for: [exp], timeout: 10)
            canc.cancel()
        }
    }

运行它,检查打印输出并查看上面代码片段中的注释

打印输出的一部分

---------- RUN 6
-- Observer 1: 5, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000020d1380>{number = 4, name = (null)}
-- recieved 10: 20 on <NSThread: 0x6000020aec40>{number = 6, name = (null)}
---------- RUN 7
-- Observer 1: 5, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000020d1380>{number = 4, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000020aec00>{number = 7, name = (null)}
-- recieved 5: 20 on <NSThread: 0x6000020aec00>{number = 7, name = (null)}
-- recieved 10: 20 on <NSThread: 0x6000020aec40>{number = 6, name = (null)}

您可以在其中看到同时发送值的效果


推荐阅读