javascript - 需要为数组的每个元素执行异步函数,但仅调度带有进度的操作
问题描述
我是 RxJS 的新手,仍在尝试弄清楚如何使用它来实现不同的功能。我需要有关实现可观察的帮助,尝试了很多方法,但似乎都没有奏效。
我有这个功能:
export function automateParameterEdit(tunId) {
const progress$ = new Subject();
const process$ = defer(async () => {
const tun = await updateStatus(tunId, 'autoTun');
progress$.next({ ...tun , progress: '0' });
return { rules: tun.rules, tun };
}).pipe(
flatMap(({ rules, tun }) =>
from(Object.values(rules)).pipe(
concatMap(rule => autoEditParameters(tunId, rule.ruleId, tun.rulesetId)),
scan((acc, curr) => acc + 1, 0),
map(progress => {
progress$.next({ ...tun, progress: progress / Object.values(rules).length * 100 });
}),
catchError(e => {
// whatever
}),
finalize(async () => {
// whatever
})
)
)
);
return merge(progress$, process$);
}
所以,现在,动作被分派了两次,一次是因为progress$.next({ ...tun, progress: progress / Object.values(rules).length * 100 });
发出新的 tun 进度,第二次我相信是因为执行:concatMap(rule => autoEditParameters(tunId, rule.ruleId, tun.rulesetId))
假设有 4 条规则 ( Object.values(rules).length === 4
)。在控制台中,我看到调度了 4 x 2 = 8 个动作,其中一半的有效负载无效。
我想要做的是执行autoEditParameters(tunId, rule.ruleId, tun.rulesetId)
which btw 是异步的,并且在每次执行后我想发出进度(progress$.next({ ...tun, progress: progress / Object.values(rules).length * 100 });
)。
如何停止调度无效操作并仅执行异步autoEditParameters
和调度进度?
解决方案
你不需要一个Subject
!
当您需要“手动”通过流推送值时,您只需要一个主题。但是,在您的情况下,您只想将 ( map
) 排放修改为不同的形状。
所以,你可以摆脱这个主题。无需process$
与progress$
;合并 你可以简单地返回progress$
;
function automateParameterEdit(tunId) {
const process$ = defer(async () => {
const tun = await updateStatus(tunId, 'autoTun');
return { rules: tun.rules, tun };
}).pipe(
flatMap(({ rules, tun }) =>
from(Object.values(rules)).pipe(
concatMap(rule => autoEditParameters(tunId, rule.ruleId, tun.rulesetId)),
scan((acc, curr) => acc + 1, 0),
map(progress => {
return { ...tun, progress: progress / Object.values(rules).length * 100 };
})
)
)
);
return process$;
}
以下是几个 StackBlitz 示例:
每次执行后我想发出进度
不确定您是否只是想发出数字百分比(而不是对象),但这很容易做到。有时将其分解为更小的函数可以更容易理解:
function automateParameterEdit(tunId): Observable<number> {
return updateTun(tunId).pipe(
flatMap(processRules)
);
}
function updateTun(tunId): Observable<Tun> {
return defer(async () => updateStatus(tunId, 'autoTun'))
}
function processRules(tun: Tun): Observable<number> {
return from(tun.rules).pipe(
concatMap(rule => autoEditParameters(tun.id, rule.ruleId, tun.rulesetId)),
scan(acc => acc + 1, 0),
map(doneCount => doneCount / tun.rules.length * 100),
startWith(0),
)
}
在这里,updateTun()
只是包装了 async 函数并返回一个 observable,所以它会在订阅时执行。
processRules()
接受 aTun
并返回一个Observable<number>
进度百分比。startWith
只是发出一个初始值0
.
推荐阅读
- r - 我如何从 R 中的 rapidapi 连接到这个 API?
- java - java.lang.NoClassDefFoundError: antlr/collections/AST 设置配置时.all.transitive = false
- php - Laravel - 如何在视图(刀片)中传递值等于输入值的变量?
- swiftui - SwiftUI 发送一个变量进行转换
- scala - 使用 Scala Spark 在同一个 csv 文件中追加新表
- sql - pyodbc 和 SQL:根据列表或系列中是否存在值来选择行
- html - 替代区域地图/如何在网站上制作交互式图像
- kotlin - PasswordVisualTransformation 将输入键转换为星号
- python - 如果列的所有元素都为空,如何转换列数据类型?
- blazor-webassembly - Blazor WebAssembly 使用地址栏导航