首页 > 解决方案 > 如何通过自定义运算符将发布者输出转发到下游订阅者?

问题描述

我对 Combine 有潜在的用途,但我在实现细节上遇到了很多麻烦。目标是提供一个将执行以下操作的发布者:

  1. 搜索缓存值,并发出该值,或者:
  2. 将订阅者推荐给将发出值的上游发布者,并将其存储在适当的缓存位置

我知道这可以使用现有的运算符来完成,但如果可能的话,我想学习如何制作自定义Operator//模式。PublisherSubscription

我希望用法类似于以下伪代码:

URLSession.shared.dataTaskPublisher(for: url)
    .cache(with: { someSortOfCachingPolicy })
    .sink()

为了实现这一点,我猜测 Apple 对mapflatMap.

我创建了一个CachePublisher尝试捕获 Upstream Publisher

struct CachePublisher<Upstream: Publisher>: Publisher {
    typealias Output = Upstream.Output
    typealias Failure = Upstream.Failure
    
    var upstream: Upstream
    
    var getCache: ()->Output?
    var setCache: (Output)->Void
    
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
        let subscription = CachePublisherSubscription(subscriber: subscriber, upstream: upstream, getCache: getCache, setCache: setCache)
        subscriber.receive(subscription: subscription)
    }
    
    init(_ upstream: Upstream, getCache: @escaping ()->Output?, setCache: @escaping (Output)->Void) {
        self.upstream = upstream
        self.getCache = getCache
        self.setCache = setCache
    }
}

紧随其后的是Subscription

extension CachePublisher {
    class CachePublisherSubscription<S: Subscriber>: Subscription where S.Input == Upstream.Output, S.Failure == Upstream.Failure {
        var subscriber: S
        var upstream: Upstream
        
        var setCache: (Output)->Void
        var getCache: ()->Output?
        
        init(subscriber: S, upstream: Upstream, getCache: @escaping ()->Output?, setCache: @escaping (Output)->Void) {
            self.subscriber = subscriber
            self.upstream = upstream
            self.getCache = getCache
            self.setCache = setCache
        }
        
        func request(_ demand: Subscribers.Demand) {
            ///check the cache for a value that satisfies the type
            ///return a value from the upstream publisher if not
            if let output = self.getCache() {
                subscriber.receive(output)
            } else {
                //forward an upstream value?
                //how? an entire publisher/subscriber chain?
            }
            
        }
        
        func cancel() {
        }
    }
}

最后,一个函数可以将上游发布者传递给CachePublisher

extension Publisher {
    func cache() -> CachePublisher<Self> {
        return CachePublisher(self, getCache: { nil }, setCache: { _ in })
    }
}

我不知道在所需的方法中放入什么,或者如何将订阅者沿链向上传递给上游发布者。或者如何从上游发布者那里获取价值。

我脑海中浮现的想法是下游订阅者有点创建嵌套娃娃类型的结构,但我只是不知道如何连接它们。

标签: swiftoperatorsreactive-programmingcombine

解决方案


Publisher//不需要整个舞蹈PublishersSubscription可以自定义subscribe方法而不需要自定义类。现有的联合运营商在这里救援:)。

extension Publisher {
    func cache(read: @escaping Publishers.Cache<Self>.Read,
               write: @escaping Publishers.Cache<Self>.Write) -> Publishers.Cache<Self> {
        Publishers.Cache(upstream: self, read: read, write: write)
    }
}

extension Publishers {
    struct Cache<P: Publisher>: Publisher {
        typealias Output = P.Output
        typealias Failure = P.Failure
        
        typealias Read = () -> Output?
        typealias Write = (Output) -> Void
        
        private let upstream: P
        private let read: Read
        private let write: Write
        
        init(upstream: P, read: @escaping Read, write: @escaping Write) {
            self.upstream = upstream
            self.read = read
            self.write = write
        }
        
        func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
            if let cachedValue = read() {
                Just(cachedValue).setFailureType(to: Failure.self).receive(subscriber: subscriber)
            } else {
                upstream.handleEvents(receiveOutput: write).receive(subscriber: subscriber)
            }
        }
    }
}

handleEvents有点打破了在编写自定义运算符管道时建议遵循的“纯函数”范式,但是由于无论如何您都需要写入缓存,这已经是一个副作用,调用的附加影响handleEvents并不大。


推荐阅读