swift - 创建一个Publisher,一一通知订阅者,相互等待
问题描述
我有这个发布者和订阅者(示例代码):
import Combine
let publisher = PassthroughSubject<ComplexStructOrClass, Never>()
let sub1 = publisher.sink { (someString) in
// Async work...
}
let sub2 = publisher.sink { (someString) in
// Async work, but it has to wait until sub1 has finished his work
}
所以publisher
常量有 2 个订阅者。当我send
在publisher
常量上使用该方法时,它应该首先发送值到sub1
完成处理之后 sub1
(使用回调或类似的东西),publisher
应该和通知sub2
。
因此,在评论中,它指出 Combine 是为此而设计的。我需要使用哪个发布者?PassthroughSubject 可能是错误的决定。
用例
我需要在我的应用程序的整个生命周期内将值发布给动态数量的订阅者,为几个不同的发布者(我希望我可以制定一个协议)。因此,可以在任何给定时间从发布者中添加和删除订阅者。订阅者如下所示:
- 它有一个
NSPersistentContainer
当新值到达时,发布者应该进行回调。该过程如下所示:
- 发布者将创建订阅者容器的背景上下文,因为它知道订阅者有一个容器
- 发布者将上下文与新发布的值一起发送给订阅者
- 发布者一直等待,直到收到订阅者的回调。订阅者不应该保存上下文,但发布者必须持有对上下文的引用。订阅者给出一个枚举的回调,它有一个好的情况和一些错误的情况。
- 当订阅者发出带有错误枚举情况的回调时,发布者必须回滚它为每个订阅者创建的上下文。
- 当订阅者使用 ok 情况进行回调时,发布者为每个订阅者重复第 1 步到第 5 步
- 只有当没有订阅者给出错误枚举案例或没有订阅者时,才会到达此步骤。发布者将保存订阅者创建的所有上下文。
当前代码,没有组合
这是一些没有使用的代码Combine
:
// My publisher
protocol NotiPublisher {
// Type of message to send
associatedtype Notification
// List of subscribers for this publisher
static var listeners: Set<AnyNotiPublisher<Notification>> { get set }
}
// My subscriber
protocol NotificationListener: Hashable {
associatedtype NotificationType
var container: NSPersistentContainer { get }
// Identifier used to find this subscriber in the list of 'listeners' in the publisher
var identifier: Int32 { get }
var notify: ((_ notification: NotificationType, _ context: NSManagedObjectContext, @escaping CompletionHandlerAck) -> ()) { get }
}
// Type erased version of the NotificationListener and some convience methods here, can add them if desired
// In a extension of NotiPublisher, this method is here
static func notify(queue: DispatchQueue, notification: Notification, completionHander: @escaping CompletionHandlerAck) throws {
let dispatchGroup = DispatchGroup()
var completionBlocks = [SomeCompletionHandler]()
var contexts = [NSManagedObjectContext]()
var didLoop = false
for listener in listeners {
if didLoop {
dispatchGroup.wait()
} else {
didLoop = true
}
dispatchGroup.enter()
listener.container.performBackgroundTask { (context) in
contexts.append(context)
listener.notify(notification, context, { (completion) in
completionBlocks.append(completion)
dispatchGroup.leave()
})
}
}
dispatchGroup.notify(queue: queue) {
let err = completion.first(where: { element in
// Check if an error has occured
})
if err == nil {
for context in contexts {
context.performAndWait {
try! context.save()
}
}
}
completionHander(err ?? .ok(true))
}
}
这是非常复杂的代码,我想知道是否可以利用 Combine 的强大功能使这段代码更具可读性。
解决方案
我编写了以下内容,以使用 flatMap 链接来自发布者的异步操作,它允许您返回另一个发布者。我不是粉丝,它可能无法满足您动态更改潜艇的需要,但它可能会对某人有所帮助:
let somePublisher = Just(12)
let anyCancellable = somePublisher.flatMap{ num in
//We can return a publisher from flatMap, so lets return a Future one because we want to do some async work
return Future<Int,Never>({ promise in
//do an async thing using dispatch
DispatchQueue.main.asyncAfter(deadline: .now() + 3, execute: {
print("flat map finished processing the number \(num)")
//now just pass on the value
promise(.success(num))
})
})
}.flatMap{ num in
//We can return a publisher from flatMap, so lets return a Future one because we want to do some async work
return Future<Int,Never>({ promise in
//do an async thing using dispatch
DispatchQueue.main.asyncAfter(deadline: .now() + 3, execute: {
print("second flat map finished processing the number \(num)")
//now just pass on the value
promise(.success(num))
})
})
}.sink { num in
print("This sink runs after the async work in the flatMap/Future publishers")
}
推荐阅读
- javascript - 应该为点赞按钮使用哪种 HTTP 请求方法
- java - java的compareTo()函数中的字符层次结构
- python - 无法使用 seaborn 绘制双轴
- terraform - 在 terragrunt 配置之间共享 aws_route53_record?
- laravel - 从 Vue 发布到 Laravel 时,Laravel 重定向不起作用
- python - 使用 scipy.optimize 中的 curve_fit 拟合数据后,如何确定某个 x 处的 y_predicted?
- angular - 根据另一个值验证一个角度反应形式的输入字段
- javascript - 使用 Angular 6 和 Billboard.js 图表库出现构建错误
- python - 通过反转图像的宽度和长度来顺时针旋转图像不起作用
- excel - 从数据透视表中的 PivotCell 获取 ParentNodeName