python-3.x - 如何使用 rxpy/rxjs 延迟事件发射?
问题描述
我有两个事件流。一个来自电感回路,另一个来自 IP 摄像机。汽车将驶过环路,然后撞上相机。如果事件彼此相隔 N 毫秒,我想将它们组合起来(汽车总是首先进入循环),但我也希望每个流中的不匹配事件(任何一个硬件都可能发生故障)都合并到一个流中。像这样的东西:
---> (only unmatched a's, None)
/ \
stream_a (loop) \
\ \
--> (a, b) ---------------------------> (Maybe a, Maybe b)
/ /
stream_b (camera) /
\ /
--> (None, only unmatched b's)
现在,我当然可以通过良好的 ole Subject 反模式来解决问题:
unmatched_a = Subject()
def noop():
pass
pending_as = [[]]
def handle_unmatched(a):
if a in pending_as[0]:
pending_as[0].remove(a)
print("unmatched a!")
unmatched_a.on_next((a, None))
def handle_a(a):
pending_as[0].append(a)
t = threading.Timer(some_timeout, handle_unmatched)
t.start()
return a
def handle_b(b):
if len(pending_as[0]):
a = pending_as[0].pop(0)
return (a, b)
else:
print("unmatched b!")
return (None, b)
stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)
这不仅相当 hacky,而且虽然我没有观察到它,但我很确定当我使用threading.Timer
. 鉴于过多的 rx 运算符,我很确定它们的某种组合可以让您在不使用的情况下做到这一点Subject
,但我无法弄清楚。如何做到这一点?
编辑
尽管出于组织和操作方面的原因,我更愿意坚持使用 Python,但我将采用 JavaScript rxjs 的答案并将其移植,甚至可能在 node.js 中重写整个脚本。
解决方案
您应该能够使用auditTime
and解决问题buffer
。像这样:
function matchWithinTime(a$, b$, N) {
const merged$ = Rx.Observable.merge(a$, b$);
// Use auditTime to compose a closing notifier for the buffer.
const audited$ = merged$.auditTime(N);
// Buffer emissions within an audit and filter out empty buffers.
return merged$
.buffer(audited$)
.filter(x => x.length > 0);
}
const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));
setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
如果b
值可能紧随其后a
并且您不希望它们匹配,则可以使用更具体的审核,如下所示:
const audited$ = merged$.audit(x => x === "a" ?
// If an `a` was received, audit upcoming values for `N` milliseconds.
Rx.Observable.timer(N) :
// If a `b` was received, don't audit the upcoming values.
Rx.Observable.of(0, Rx.Scheduler.asap)
);
推荐阅读
- java - 如何在 Spring Boot 中测试组件/bean
- redhat-datavirt - JDV中是否存在函数NTILE
- jquery - jquery使用parseFloat和toFixed
- javascript - lodash 链式方法中的当前链
- javascript - OnEdit() 谷歌脚本触发器在函数中的使用
- android - 你如何让 Android TextView 向右展开?
- python - 如何在 Python 的 yaml 文件中配置多个字典键?
- laravel - laravel vue 无法获取数据
- python - Flask/python 路由在本地运行,但在 GCP 上崩溃
- javascript - Express:已解决的 Promise 将接听电话