swift - 如何通过自定义运算符将发布者输出转发到下游订阅者?
问题描述
我对 Combine 有潜在的用途,但我在实现细节上遇到了很多麻烦。目标是提供一个将执行以下操作的发布者:
- 搜索缓存值,并发出该值,或者:
- 将订阅者推荐给将发出值的上游发布者,并将其存储在适当的缓存位置
我知道这可以使用现有的运算符来完成,但如果可能的话,我想学习如何制作自定义Operator
//模式。Publisher
Subscription
我希望用法类似于以下伪代码:
URLSession.shared.dataTaskPublisher(for: url)
.cache(with: { someSortOfCachingPolicy })
.sink()
为了实现这一点,我猜测 Apple 对map
和flatMap
.
我创建了一个CachePublisher
尝试捕获 Upstream Publisher
:
struct CachePublisher<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
var upstream: Upstream
var getCache: ()->Output?
var setCache: (Output)->Void
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
let subscription = CachePublisherSubscription(subscriber: subscriber, upstream: upstream, getCache: getCache, setCache: setCache)
subscriber.receive(subscription: subscription)
}
init(_ upstream: Upstream, getCache: @escaping ()->Output?, setCache: @escaping (Output)->Void) {
self.upstream = upstream
self.getCache = getCache
self.setCache = setCache
}
}
紧随其后的是Subscription
:
extension CachePublisher {
class CachePublisherSubscription<S: Subscriber>: Subscription where S.Input == Upstream.Output, S.Failure == Upstream.Failure {
var subscriber: S
var upstream: Upstream
var setCache: (Output)->Void
var getCache: ()->Output?
init(subscriber: S, upstream: Upstream, getCache: @escaping ()->Output?, setCache: @escaping (Output)->Void) {
self.subscriber = subscriber
self.upstream = upstream
self.getCache = getCache
self.setCache = setCache
}
func request(_ demand: Subscribers.Demand) {
///check the cache for a value that satisfies the type
///return a value from the upstream publisher if not
if let output = self.getCache() {
subscriber.receive(output)
} else {
//forward an upstream value?
//how? an entire publisher/subscriber chain?
}
}
func cancel() {
}
}
}
最后,一个函数可以将上游发布者传递给CachePublisher
extension Publisher {
func cache() -> CachePublisher<Self> {
return CachePublisher(self, getCache: { nil }, setCache: { _ in })
}
}
我不知道在所需的方法中放入什么,或者如何将订阅者沿链向上传递给上游发布者。或者如何从上游发布者那里获取价值。
我脑海中浮现的想法是下游订阅者有点创建嵌套娃娃类型的结构,但我只是不知道如何连接它们。
解决方案
Publisher
//不需要整个舞蹈Publishers
,Subscription
可以自定义subscribe
方法而不需要自定义类。现有的联合运营商在这里救援:)。
extension Publisher {
func cache(read: @escaping Publishers.Cache<Self>.Read,
write: @escaping Publishers.Cache<Self>.Write) -> Publishers.Cache<Self> {
Publishers.Cache(upstream: self, read: read, write: write)
}
}
extension Publishers {
struct Cache<P: Publisher>: Publisher {
typealias Output = P.Output
typealias Failure = P.Failure
typealias Read = () -> Output?
typealias Write = (Output) -> Void
private let upstream: P
private let read: Read
private let write: Write
init(upstream: P, read: @escaping Read, write: @escaping Write) {
self.upstream = upstream
self.read = read
self.write = write
}
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
if let cachedValue = read() {
Just(cachedValue).setFailureType(to: Failure.self).receive(subscriber: subscriber)
} else {
upstream.handleEvents(receiveOutput: write).receive(subscriber: subscriber)
}
}
}
}
handleEvents
有点打破了在编写自定义运算符管道时建议遵循的“纯函数”范式,但是由于无论如何您都需要写入缓存,这已经是一个副作用,调用的附加影响handleEvents
并不大。
推荐阅读
- linux-kernel - 将 yocto morty 更新为 thud 后内核编译错误
- python - 对 GCloud Vision API 的异步 API 请求以进行文本检测
- sql - SSRS 文本参数绑定到 SQL 变量
- python-3.x - 什么是 OpenCV 中实现的立体 BM 和 SGBM 算法中的散斑
- pdf - ConvertAPI DOCX 到 PDF,需要 MS Word 样式的边距注释
- selenium - Serenity BDD / Selenium 测试工作 6 出 10
- javascript - iOS 中的 Expo 计步器有问题,有变化吗?
- python - 使用 pybind11 在 python 中使用 C++ 对象
- docker - 通过jenkins在kubernetes上部署docker镜像的问题
- angular - Edge、IE 和旧版浏览器中的 ViewEncapsulation.ShadowDom 支持