ios - Swift Combine - Publishers.CombineLatest on multiple Threads
问题描述
When using Publishers.CombineLatest with Publishers which run an threads other that Main the .sink of the Publishers.CombineLatest
is not always called.
The issue does not appear every time this is why I created unit tests which try the test 100 times in a row. Usually they fail after 4-5 iterations.
import XCTest
import Combine
class CombineLatestTests: XCTestCase {
override func setUp() {
continueAfterFailure = false
}
func testCombineLatest_receiveOn() {
for x in 0...1000 {
print("---------- RUN \(x)")
let queue1 = DispatchQueue.global(qos: .userInitiated)
let queue2 = DispatchQueue.global(qos: .background)
let subj1 = PassthroughSubject<Int, Never>()
let subj2 = PassthroughSubject<Int, Never>()
let publ1 = subj1.receive(on: queue1).map { value -> Int in
print("-- Observer 1: \(value), Thread: \(Thread.current)")
return value
}
let publ2 = subj2.receive(on: queue2).map { value -> Int in
print("-- Observer 2: \(value), Thread: \(Thread.current)")
return value
}
let exp = expectation(description: "expect values")
exp.assertForOverFulfill = false
let canc = Publishers.CombineLatest(publ1, publ2)
.sink { value1, value2 in
print("-- recieved \(value1):\(value2) on \(Thread.current)")
if value1 == 10, value2 == 20 {
exp.fulfill()
}
}
subj1.send(5)
subj2.send(20)
subj1.send(10)
wait(for: [exp], timeout: 10)
canc.cancel()
}
}
func testCombineLatest_currentValue_receiveOn() {
for x in 0...100 {
print("---------- RUN \(x)")
let queue1 = DispatchQueue.global(qos: .userInitiated)
let queue2 = DispatchQueue.global(qos: .background)
let subj1 = CurrentValueSubject<Int, Never>(0)
let subj2 = CurrentValueSubject<Int, Never>(0)
let publ1 = subj1.receive(on: queue1).map { value -> Int in
print("-- Observer 1: \(value), Thread: \(Thread.current)")
return value
}
let publ2 = subj2.receive(on: queue2).map { value -> Int in
print("-- Observer 2: \(value), Thread: \(Thread.current)")
return value
}
let exp = expectation(description: "expect values")
exp.assertForOverFulfill = false
let canc = Publishers.CombineLatest(publ1,
publ2)
.sink { value1, value2 in
print("-- recieved \(value1):\(value2) on \(Thread.current)")
if value1 == 10, value2 == 20 {
exp.fulfill()
}
}
subj1.send(10)
subj2.send(20)
wait(for: [exp], timeout: 3)
canc.cancel()
}
}
func testCombineLatest_subscribeOn() {
for x in 0...100 {
print("---------- RUN \(x)")
let queue1 = DispatchQueue.global(qos: .userInitiated)
let queue2 = DispatchQueue.global(qos: .background)
let subj1 = PassthroughSubject<Int, Never>()
let subj2 = PassthroughSubject<Int, Never>()
let publ1 = subj1.map { value -> Int in
print("-- Observer 1: \(value), Thread: \(Thread.current)")
return value
}
let publ2 = subj2.map { value -> Int in
print("-- Observer 2: \(value), Thread: \(Thread.current)")
return value
}
let exp = expectation(description: "expect values")
exp.assertForOverFulfill = false
let canc = Publishers.CombineLatest(publ1, publ2)
.sink { value1, value2 in
print("-- recieved \(value1):\(value2) on \(Thread.current)")
if value1 == 10, value2 == 20 {
exp.fulfill()
}
}
queue1.async {
subj1.send(5)
subj1.send(10)
}
queue2.async {
subj2.send(20)
}
wait(for: [exp], timeout: 5)
canc.cancel()
}
}
}
Here are the logs of the 3rd test
Test Case '-[xxxx.CombineLatestTests testCombineLatest_currentValue_receiveOn]' started.
---------- RUN 0
-- Observer 2: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- recieved 10:20 on <NSThread: 0x600000439880>{number = 4, name = (null)}
---------- RUN 1
-- Observer 2: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:20 on <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
---------- RUN 2
-- Observer 2: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- recieved 10:20 on <NSThread: 0x600000439880>{number = 4, name = (null)}
---------- RUN 3
-- Observer 2: 0, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:20 on <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
---------- RUN 4
-- Observer 1: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 2: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:0 on <NSThread: 0x600000439880>{number = 4, name = (null)}
CombineLatestTests.swift:93: error: : Asynchronous wait failed: Exceeded timeout of 3 seconds, with unfulfilled expectations: "expect values".
Test Suite 'CombineLatestTests' failed at 2020-03-04 20:37:24.957.
Executed 3 tests, with 3 failures (0 unexpected) in 18.159 (18.161) seconds
解决方案
试试这个测试
func testCombineLatest_receiveOn() {
for x in 0...100 {
print("---------- RUN \(x)")
let q1 = DispatchQueue(label: "q1", qos: .background)
let q2 = DispatchQueue(label: "q2", qos: .utility, attributes: .concurrent)
let subj1 = PassthroughSubject<Int, Never>()
let subj2 = PassthroughSubject<Int, Never>()
let publ1 = subj1
.map { value -> Int in
print("-- Observer 1: \(value), Thread: \(Thread.current)")
return value
}
let publ2 = subj2
.map { value -> Int in
print("-- Observer 2: \(value), Thread: \(Thread.current)")
return value
}
let exp = expectation(description: "expect values")
exp.assertForOverFulfill = false
// you have to use the same serial queue for publishers which you like to combine
let canc = Publishers.CombineLatest(publ1.receive(on: q1), publ2.receive(on: q1))
// this just redirect it to different queue which could be even concurrent
// (it has no effect at all)
.receive(on: q2)
.sink { value1, value2 in
print("-- recieved \(value1): \(value2) on \(Thread.current)")
if value1 == 10, value2 == 20 {
exp.fulfill()
}
}
let q = DispatchQueue.global()
// the values could be updated concurently
q.async {
subj1.send(5)
}
q.async {
subj2.send(20)
}
q.async {
subj1.send(10)
}
wait(for: [exp], timeout: 10)
canc.cancel()
}
}
运行它,检查打印输出并查看上面代码片段中的注释
打印输出的一部分
---------- RUN 6
-- Observer 1: 5, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000020d1380>{number = 4, name = (null)}
-- recieved 10: 20 on <NSThread: 0x6000020aec40>{number = 6, name = (null)}
---------- RUN 7
-- Observer 1: 5, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000020d1380>{number = 4, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000020aec00>{number = 7, name = (null)}
-- recieved 5: 20 on <NSThread: 0x6000020aec00>{number = 7, name = (null)}
-- recieved 10: 20 on <NSThread: 0x6000020aec40>{number = 6, name = (null)}
您可以在其中看到同时发送值的效果
推荐阅读
- java - 分页和天真的查询 + eclipse 链接
- android - 阅读 Mifare Classic 会返回奇怪的字符
- blazor-server-side - Blazor 组件库中的 CSS 未加载
- ios - 我们如何将闭包传递给应用流程中的任何 ViewController?
- android - 更改 .so 库中的函数名称
- java - 雅加达ee到底是什么
- typescript - 可视代码显示找不到绝对导入的模块警告
- javascript - 循环播放时 HTML 画布内的视频闪烁
- mysql - 通过隧道 ssh 使用 sequelize 访问远程数据库时出错
- c++ - 多线程传递参数