首页 > 解决方案 > 如何通过重复较短的序列压缩 observables

问题描述

我试图弄清楚如何达到以下结果:

A: -a--b--c-d--e-f-|
B: --1-2-3-|
=: --a-b--c-d--e-f-|
 : --1-2--3-1--2-3

其中A,B是输入流,'='代表输出流(作为元组A,B)

反之亦然:

A: -a-b-|
B: --1--2-3-4--5--6-7-|
=: --a--b-a-b--a--b-a-|
 : --1--2-3-4--5--6-7

因此,在纯文本中-我正在寻找一种行为类似于 zip 运算符但具有“重放”较短序列以匹配较长序列的能力的东西。

知道如何解决这个问题吗?


解决方案 1

@DanielT提供的解决方案(有一些问题)

extension ObservableType {
    public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
        return Observable.create { observer in
            var aa: [A] = []
            var aComplete = false
            var bb: [B] = []
            var bComplete = false
            let lock = NSRecursiveLock()
            let disposableA = a.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let ae):
                    aa.append(ae)
                    if bComplete {
                        observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete = true
                    if bComplete {
                        observer.onCompleted()
                    }
                }
            }
            let disposableB = b.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let be):
                    bb.append(be)
                    if aComplete {
                        observer.onNext((aa[(bb.count - 1) % aa.count], be))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete = true
                    if aComplete {
                        observer.onCompleted()
                    }
                }
            }
            return Disposables.create(disposableA, disposableB)
        }
    }
}

解决方案 2

我自己的解决方案受到以下答案的启发 - 自己的操作员 (HT @DanielT) 但采用更命令式的方法 (HT @iamtimmo):

extension ObservableType {
    public static func zipRepeatCollected<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A?, B?)> {
        return Observable.create { observer in

            var bufferA: [A] = []
            let aComplete = PublishSubject<Bool>()
            aComplete.onNext(false);

            var bufferB: [B] = []
            let bComplete = PublishSubject<Bool>()
            bComplete.onNext(false);

            let disposableA = a.subscribe { event in
                switch event {
                case .next(let valueA):
                    bufferA.append(valueA)
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete.onNext(true)
                    aComplete.onCompleted()
                }
            }

            let disposableB = b.subscribe { event in
                switch event {
                case .next(let value):
                    bufferB.append(value)
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete.onNext(true)
                    bComplete.onCompleted()
                }
            }

            let disposableZip = Observable.zip(aComplete, bComplete)
                .filter { $0 == $1 && $0 == true }
                .subscribe { event in
                    switch event {
                    case .next(_, _):
                        var zippedList = Array<(A?, B?)>()

                        let lengthA = bufferA.count
                        let lengthB = bufferB.count

                        if lengthA > 0 && lengthB > 0 {
                            for i in 0 ..< max(lengthA, lengthB) {
                                let aVal = bufferA[i % lengthA]
                                let bVal = bufferB[i % lengthB]
                                zippedList.append((aVal, bVal))
                            }
                        } else if lengthA == 0 {
                            zippedList = bufferB.map { (nil, $0) }
                        } else {
                            zippedList = bufferA.map { ($0, nil) }
                        }

                        zippedList.forEach { observer.onNext($0) }
                    case .completed:
                        observer.onCompleted()
                    case .error(let e):
                        observer.onError(e)
                    }
            }

            return Disposables.create(disposableA, disposableB, disposableZip)
        }
    }
}

class ZipRepeatTests: XCTestCase {
    func testALongerThanB() {
        assertAopBEqualsE(
            a: "-a--b--c-d--e-f-|",
            b: "--1-2-3-|",
            e: "a1,b2,c3,d1,e2,f3,|")
    }

    func testAShorterThanB() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "--1-2-3-|",
            e: "a1,b2,a3,|")
    }
    func testBStartsLater() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "----1---2|",
            e: "a1,b2,|")

    }
    func testABWithConstOffset() {
        assertAopBEqualsE(
            a: "-a--b--c|",
            b: "----1--2--3--|",
            e: "a1,b2,c3,|")
    }

    func testAEndsBeforeBStarts() {
        assertAopBEqualsE(
            a: "ab|",
            b: "---1-2-3-4-|",
            e: "a1,b2,a3,b4,|")
    }

    func testACompletesWithoutValue() {
        assertAopBEqualsE(
            a: "-|",
            b: "--1-2-3-|",
            e: "1,2,3,|")
    }
    func testBCompletesWithoutValue() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "|",
            e: "a,b,|")
    }
    func testNoData() {
        assertAopBEqualsE(
            a: "-|",
            b: "|",
            e: "|")
    }

    func assertAopBEqualsE(_ scheduler: TestScheduler = TestScheduler(initialClock: 0), a: String, b: String, e: String, file: StaticString = #file, line: UInt = #line) {

        let aStream = scheduler.createColdObservable(events(a))
        let bStream = scheduler.createColdObservable(events(b))
        let eStream = expected(e)

        let bResults = scheduler.start {
            Observable<(String)>.zipRepeatCollected(aStream.asObservable(), bStream.asObservable()).map { "\($0 ?? "")\($1 ?? "")" }
        }
        XCTAssertEqual(eStream, bResults.events.map { $0.value }, file: file, line: line)
    }
    func expected(_ stream: String) -> [Event<String>] {
        stream.split(separator: ",").map { String($0) == "|" ? .completed : .next(String($0)) }
    }
    func events(_ stream: String, step: Int = 10) -> [Recorded<Event<String>>] {
        var time = 0
        var events = [Recorded<Event<String>>]()
        stream.forEach { c in
            if c == "|" {
                events.append(.completed(time))
            } else if c != "-" {
                events.append(.next(time, String(c)))
            }
            time += step
        }
        return events
    }
}

标签: swiftrx-swift

解决方案


如有疑问,您始终可以创建自己的运算符:

extension ObservableType {
    public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
        return Observable.create { observer in
            var aa: [A] = []
            var aComplete = false
            var bb: [B] = []
            var bComplete = false
            let lock = NSRecursiveLock()
            let disposableA = a.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let ae):
                    aa.append(ae)
                    if bComplete {
                        observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete = true
                    if bComplete {
                        observer.onCompleted()
                    }
                }
            }
            let disposableB = b.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let be):
                    bb.append(be)
                    if aComplete {
                        observer.onNext((aa[(bb.count - 1) % aa.count], be))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete = true
                    if aComplete {
                        observer.onCompleted()
                    }
                }
            }
            return Disposables.create(disposableA, disposableB)
        }
    }
}

以及显示功能的测试:

class RxSandboxTests: XCTestCase {

    func testLongA() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .next(40, "d"), .next(50, "e"), .next(60, "f"), .completed(60)])
        let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .completed(30)])

        let bResults = scheduler.start {
            Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.1 }
        }

        XCTAssertEqual(bResults.events, [.next(210, 1), .next(220, 2), .next(230, 3), .next(240, 1), .next(250, 2), .next(260, 3), .completed(260)])
    }

    func testLongB() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .completed(30)])
        let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .next(40, 4), .next(50, 5), .next(60, 6), .completed(60)])

        let aResults = scheduler.start {
            Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.0 }
        }

        XCTAssertEqual(aResults.events, [.next(210, "a"), .next(220, "b"), .next(230, "c"), .next(240, "a"), .next(250, "b"), .next(260, "c"), .completed(260)])
    }
}

推荐阅读