rxjs - 延迟后重新执行异步 RxJS 流
问题描述
我正在使用 RxJS 6 使用类似于下面运行的示例的代码来懒惰地逐步遍历可迭代对象。这运作良好,但我无法解决我的最终用例。
完整代码在这里
import { EMPTY, defer, from, of } from "rxjs";
import { delay, expand, mergeMap, repeat } from "rxjs/operators";
function stepIterator (iterator) {
return defer(() => of(iterator.next())).pipe(
mergeMap(result => result.done ? EMPTY : of(result.value))
);
}
function iterateValues ({ params }) {
const { values, delay: delayMilliseconds } = params;
const isIterable = typeof values[Symbol.iterator] === "function";
// Iterable values which are emitted over time are handled manually. Otherwise
// the values are provided to Rx for resolution.
if (isIterable && delayMilliseconds > 0) {
const iterator = values[Symbol.iterator]();
// The first value is emitted immediately, the rest are emitted after time.
return stepIterator(iterator).pipe(
expand(v => stepIterator(iterator).pipe(delay(delayMilliseconds)))
);
} else {
return from(values);
}
}
const options = {
params: {
// Any iterable object is walked manually. Otherwise delegate to `from()`.
values: ["Mary", "had", "a", "little", "lamb"],
// Delay _between_ values.
delay: 350,
// Delay before the stream restarts _after the last value_.
runAgainAfter: 1000,
}
};
iterateValues(options)
// Is not repeating?!
.pipe(repeat(3))
.subscribe(
v => {
console.log(v, Date.now());
},
console.error,
() => {
console.log('Complete');
}
);
我想添加另一个选项,它将在延迟(runAgainAfter
)后无限次重新执行流。result.done
在没有深入考虑案例的情况下,我无法清晰地撰写此内容。到目前为止,我一直无法围绕iterateValues
.
完成用例的最佳方法是什么?
谢谢!
编辑1:repeat
只是打我的脸。也许这意味着友好。编辑2:不,重复不是重复,但可观察的正在完成。谢谢你的帮助。我很困惑。
对于后代,这里是修订版的完整代码示例 -repeat
能够并在项目之间使用一致的延迟。
import { concat, EMPTY, defer, from, interval, of, throwError } from "rxjs";
import { delay, expand, mergeMap, repeat } from "rxjs/operators";
function stepIterator(iterator) {
return defer(() => of(iterator.next())).pipe(
mergeMap(result => (result.done ? EMPTY : of(result.value)))
);
}
function iterateValues({ params }) {
const { values, delay: delayMilliseconds, times = 1 } = params;
const isIterable =
values != null && typeof values[Symbol.iterator] === "function";
if (!isIterable) {
return throwError(new Error(`\`${values}\` is not iterable`));
}
// Iterable values which are emitted over time are handled manually. Otherwise
// the values are provided to Rx for resolution.
const observable =
delayMilliseconds > 0
? defer(() => of(values[Symbol.iterator]())).pipe(
mergeMap(iterator =>
stepIterator(iterator).pipe(
expand(v => stepIterator(iterator).pipe(delay(delayMilliseconds)))
)
)
)
: from(values);
return observable.pipe(repeat(times));
}
解决方案
我会说实话,但肯定会有更好的解决方案。在我的解决方案中,我最终将延迟逻辑封装在自定义runAgainAfter
运算符中。使其成为独立的部分,不会直接影响您的代码逻辑。
完整的工作代码在这里
runAgainAfter
如果有人需要它的代码:
import { Observable } from "rxjs";
export const runAgainAfter = delay => observable => {
return new Observable(observer => {
let timeout;
let subscription;
const subscribe = () => {
return observable.subscribe({
next(value) {
observer.next(value);
},
error(err) {
observer.error(err);
},
complete() {
timeout = setTimeout(() => {
subscription = subscribe();
}, delay);
}
});
};
subscription = subscribe();
return () => {
subscription.unsubscribe();
clearTimeout(timeout);
};
});
};
希望对你有帮助 <3
推荐阅读
- ios - 在 unnotificationrequest 中设置触发日期后的重复间隔
- powershell - 使用 powershell 发送带有内联附件的电子邮件
- nginx - 强制 SSL 时 Nginx 重定向循环
- javascript - JS async/await 不会等待我的函数来解决他们的承诺
- node.js - path.js:1086 错误:ENOENT:没有这样的文件或目录,uv_cwd
- bash - 获取 echo 的输出并在同一命令的控制台上显示它
- excel - 浮点精度问题
- javascript - 在按钮单击时以编程方式将焦点移至文本字段
- c# - 参数类型不匹配,DateTime 和 DateTime
- php - 在 10 月 cms 中显示验证消息中的图像索引