swift - 如何通过重复较短的序列压缩 observables
问题描述
我试图弄清楚如何达到以下结果:
A: -a--b--c-d--e-f-|
B: --1-2-3-|
=: --a-b--c-d--e-f-|
: --1-2--3-1--2-3
其中A,B是输入流,'='代表输出流(作为元组A,B)
反之亦然:
A: -a-b-|
B: --1--2-3-4--5--6-7-|
=: --a--b-a-b--a--b-a-|
: --1--2-3-4--5--6-7
因此,在纯文本中-我正在寻找一种行为类似于 zip 运算符但具有“重放”较短序列以匹配较长序列的能力的东西。
知道如何解决这个问题吗?
解决方案 1
@DanielT提供的解决方案(有一些问题)
extension ObservableType {
public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
return Observable.create { observer in
var aa: [A] = []
var aComplete = false
var bb: [B] = []
var bComplete = false
let lock = NSRecursiveLock()
let disposableA = a.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let ae):
aa.append(ae)
if bComplete {
observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
aComplete = true
if bComplete {
observer.onCompleted()
}
}
}
let disposableB = b.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let be):
bb.append(be)
if aComplete {
observer.onNext((aa[(bb.count - 1) % aa.count], be))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
bComplete = true
if aComplete {
observer.onCompleted()
}
}
}
return Disposables.create(disposableA, disposableB)
}
}
}
解决方案 2
我自己的解决方案受到以下答案的启发 - 自己的操作员 (HT @DanielT) 但采用更命令式的方法 (HT @iamtimmo):
extension ObservableType {
public static func zipRepeatCollected<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A?, B?)> {
return Observable.create { observer in
var bufferA: [A] = []
let aComplete = PublishSubject<Bool>()
aComplete.onNext(false);
var bufferB: [B] = []
let bComplete = PublishSubject<Bool>()
bComplete.onNext(false);
let disposableA = a.subscribe { event in
switch event {
case .next(let valueA):
bufferA.append(valueA)
case .error(let error):
observer.onError(error)
case .completed:
aComplete.onNext(true)
aComplete.onCompleted()
}
}
let disposableB = b.subscribe { event in
switch event {
case .next(let value):
bufferB.append(value)
case .error(let error):
observer.onError(error)
case .completed:
bComplete.onNext(true)
bComplete.onCompleted()
}
}
let disposableZip = Observable.zip(aComplete, bComplete)
.filter { $0 == $1 && $0 == true }
.subscribe { event in
switch event {
case .next(_, _):
var zippedList = Array<(A?, B?)>()
let lengthA = bufferA.count
let lengthB = bufferB.count
if lengthA > 0 && lengthB > 0 {
for i in 0 ..< max(lengthA, lengthB) {
let aVal = bufferA[i % lengthA]
let bVal = bufferB[i % lengthB]
zippedList.append((aVal, bVal))
}
} else if lengthA == 0 {
zippedList = bufferB.map { (nil, $0) }
} else {
zippedList = bufferA.map { ($0, nil) }
}
zippedList.forEach { observer.onNext($0) }
case .completed:
observer.onCompleted()
case .error(let e):
observer.onError(e)
}
}
return Disposables.create(disposableA, disposableB, disposableZip)
}
}
}
class ZipRepeatTests: XCTestCase {
func testALongerThanB() {
assertAopBEqualsE(
a: "-a--b--c-d--e-f-|",
b: "--1-2-3-|",
e: "a1,b2,c3,d1,e2,f3,|")
}
func testAShorterThanB() {
assertAopBEqualsE(
a: "-a--b|",
b: "--1-2-3-|",
e: "a1,b2,a3,|")
}
func testBStartsLater() {
assertAopBEqualsE(
a: "-a--b|",
b: "----1---2|",
e: "a1,b2,|")
}
func testABWithConstOffset() {
assertAopBEqualsE(
a: "-a--b--c|",
b: "----1--2--3--|",
e: "a1,b2,c3,|")
}
func testAEndsBeforeBStarts() {
assertAopBEqualsE(
a: "ab|",
b: "---1-2-3-4-|",
e: "a1,b2,a3,b4,|")
}
func testACompletesWithoutValue() {
assertAopBEqualsE(
a: "-|",
b: "--1-2-3-|",
e: "1,2,3,|")
}
func testBCompletesWithoutValue() {
assertAopBEqualsE(
a: "-a--b|",
b: "|",
e: "a,b,|")
}
func testNoData() {
assertAopBEqualsE(
a: "-|",
b: "|",
e: "|")
}
func assertAopBEqualsE(_ scheduler: TestScheduler = TestScheduler(initialClock: 0), a: String, b: String, e: String, file: StaticString = #file, line: UInt = #line) {
let aStream = scheduler.createColdObservable(events(a))
let bStream = scheduler.createColdObservable(events(b))
let eStream = expected(e)
let bResults = scheduler.start {
Observable<(String)>.zipRepeatCollected(aStream.asObservable(), bStream.asObservable()).map { "\($0 ?? "")\($1 ?? "")" }
}
XCTAssertEqual(eStream, bResults.events.map { $0.value }, file: file, line: line)
}
func expected(_ stream: String) -> [Event<String>] {
stream.split(separator: ",").map { String($0) == "|" ? .completed : .next(String($0)) }
}
func events(_ stream: String, step: Int = 10) -> [Recorded<Event<String>>] {
var time = 0
var events = [Recorded<Event<String>>]()
stream.forEach { c in
if c == "|" {
events.append(.completed(time))
} else if c != "-" {
events.append(.next(time, String(c)))
}
time += step
}
return events
}
}
解决方案
如有疑问,您始终可以创建自己的运算符:
extension ObservableType {
public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
return Observable.create { observer in
var aa: [A] = []
var aComplete = false
var bb: [B] = []
var bComplete = false
let lock = NSRecursiveLock()
let disposableA = a.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let ae):
aa.append(ae)
if bComplete {
observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
aComplete = true
if bComplete {
observer.onCompleted()
}
}
}
let disposableB = b.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let be):
bb.append(be)
if aComplete {
observer.onNext((aa[(bb.count - 1) % aa.count], be))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
bComplete = true
if aComplete {
observer.onCompleted()
}
}
}
return Disposables.create(disposableA, disposableB)
}
}
}
以及显示功能的测试:
class RxSandboxTests: XCTestCase {
func testLongA() {
let scheduler = TestScheduler(initialClock: 0)
let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .next(40, "d"), .next(50, "e"), .next(60, "f"), .completed(60)])
let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .completed(30)])
let bResults = scheduler.start {
Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.1 }
}
XCTAssertEqual(bResults.events, [.next(210, 1), .next(220, 2), .next(230, 3), .next(240, 1), .next(250, 2), .next(260, 3), .completed(260)])
}
func testLongB() {
let scheduler = TestScheduler(initialClock: 0)
let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .completed(30)])
let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .next(40, 4), .next(50, 5), .next(60, 6), .completed(60)])
let aResults = scheduler.start {
Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.0 }
}
XCTAssertEqual(aResults.events, [.next(210, "a"), .next(220, "b"), .next(230, "c"), .next(240, "a"), .next(250, "b"), .next(260, "c"), .completed(260)])
}
}
推荐阅读
- gtsummary - gtsummary 小数平均值和 SD - 更改默认值
- python - 如何在 Django 模型中实现一对值
- css - 从css中的另一个元素内部控制一个元素
- python - 如何为基于用户代理的 SIP 呼叫创建 IP 表防火墙规则
- sql - 使用 SQL 递归地将列中的层次结构分解为多列
- r - 用 tibble 创建一个表
- javascript - 谷歌表格问题用脚本函数复制单元格公式
- api - 错误请求:400,为 OAuth2.0 生成不记名令牌时
- spring-boot - 使用 Kubernetes 服务发现从 Spring Boot Admin 访问安全执行器
- kdb - 在同一端口上重新打开连接会导致文件描述符句柄错误