javascript - 使用 rxjs 6 如何创建带有缓存的可取消 Web 请求管道,并限制同时发生的请求数量
问题描述
我希望有人能够指出我正确的方向,因为我正在努力结合并发和取消rxjs 中排队请求的能力。我将尝试在连续事件中解释这一点。假设我们有 observable A,它接收一个字符串数组。
事件:A 观察到:['dog', 'cat', 'elephant', 'tiger'] 下游检查字符串网络响应是否被缓存,如果它存在于缓存中则从缓存中获取它,如果没有则从缓存中请求它web 并使用 publishReplay / shareReplay 将 observable 保存到缓存中。一次发生 2 个网络请求的限制,因此它尝试从 api 获取“狗”和“猫”(此操作需要 2000 毫秒以上)。1000 毫秒后,A 观察到另一组值:['dog', 'rat', 'horse', 'rabbit']。
接下来应该发生以下情况。我不希望取消“狗”和“猫”请求,我希望他们完成他们的请求,但我想忽略第一个请求中的“大象”和“老虎”。一旦 'dog' 和 'cat' 从第二帧得到他们的响应'rat' 和 'horse' 应该从网络请求,最后一旦其中任何一个解析 'rabbit' 被请求。
这是我当前的代码。我尝试在 networkObservable 的 defer 和 from 之间切换,但行为不同,这也不是我想要的。
const cache = new Map();
// Fake Promise to fake a api request
function later(delay, value) {
console.log('requesting', value);
return new Promise(resolve => setTimeout(resolve, delay, value));
}
const testObservable = of(['dog', 'rat', 'horse', 'rabbit']).pipe(
delay(1000),
startWith(['dog', 'cat', 'elephant', 'tiger'])
);
testObservable.pipe(
map(array => from(array).pipe(
publish(arrayObservable => {
const cachedObservable = arrayObservable.pipe(
filter(id => cache.has(id)),
flatMap(id => cache.get(id), 1)
);
const uncachedObservable = arrayObservable.pipe(
filter(id => !cache.has(id)),
flatMap(id => {
const networkObservable = from(later(2000, id)).pipe(
tap(e => console.log('response', e)),
map(e => 'parsed: ' + e),
tap(e => console.log('parsed', e)),
publishReplay(1),
refCount(),
take(1)
);
cache.set(id, networkObservable);
return networkObservable;
}, 2)
);
return merge(cachedObservable, uncachedObservable);
})
)),
switchAll()
)
这导致输出:
requesting dog
requesting cat
requesting rat
requesting horse
response dog
parsed parsed: dog
response rat
parsed parsed: rat
requesting rabbit
response horse
parsed parsed: horse
response rabbit
parsed parsed: rabbit
这接近想要的行为,但有一个明显的缺陷。Rat and horse 被请求并且不等待 dog 和 cat 在被处决之前解决。但是,“老虎”和“大象”已被妥善处理,因此功能正常。
我是否必须创建一个单独的主题来处理请求?
解决方案
我试图为这个有趣的问题找到一个解决方案,至少就我所理解的而言。
起点testObservable
是Arrays<string>
. 这些数组中的每一个都string
代表对后端服务的潜在请求。在每个给定时间,动态请求不能超过 2 个,因此必须有某种队列机制。为此,我使用 的concurrency
参数mergeMap
。
这里的关键点是,每当一个新的 Array 由 发出时testObservable
,任何与string
包含在先前发出的数组中的 s 相关但尚未发送到远程服务的请求都应该停止。
所以我开始创建一个对象流,其中包含string
远程服务调用的输入以及像这样的停止指示器
testObservable
.pipe(
mergeMap((a) => {
i++;
if (arrays[i - 1]) {
arrays[i - 1].stop = true;
}
const thisArray = { stop: false, i };
arrays[i] = thisArray;
return from(a.map((_v) => ({ v: _v, ssTop: arrays[i] }))).pipe(
mergeMap((d) => {
// d is an object containing the parameter for the remote call and an object of type {stop: boolean, i: number}
// for every string of every array a d is emitted
}, 2)
);
})
)
然后,对于每个d
发出的,我可以实现一个逻辑,确保只有在stop
标志未设置为true
这样的情况下才执行对远程服务的调用
d.ssTop.stop
? NEVER
: from(later(2000, d.v))
请注意,通过确保在发出第 th 数组后不进行与第 th 数组相关的调用,任何时候发出第 th 数组时,第 th 数组的stop
标志i
设置为 true 。i+1
testObservable
i
i+1
这可能是完整代码的样子
const cache = new Map();
// Fake Promise to fake a api request
function later(delay, value) {
console.log("requesting", value);
return new Promise((resolve) => setTimeout(resolve, delay, value));
}
const testObservable = of(["dog", "rat", "horse", "rabbit"]).pipe(
delay(1000),
startWith(["dog", "cat", "elephant", "tiger"])
);
let i = 0;
let arrays: { stop: boolean; i: number }[] = [];
testObservable
.pipe(
mergeMap((a) => {
i++;
if (arrays[i - 1]) {
arrays[i - 1].stop = true;
}
const thisArray = { stop: false, i };
arrays[i] = thisArray;
return from(a.map((_v) => ({ v: _v, ssTop: arrays[i] }))).pipe(
mergeMap((d) => {
return d.ssTop.stop
? NEVER
: cache[d.v]
? of(`${d.v} is the returned from cache}`)
: from(later(2000, d.v)).pipe(
map((v: any) => {
cache[v] = v;
return `${v} is the returned value ${d.ssTop.i}`;
})
);
}, 2)
);
})
)
.subscribe({
next: (d) => {
console.log(d);
},
});
推荐阅读
- spring-boot - 在 spring 中重新建立 RPC 连接
- python - RuntimeError:python 注册表中缺少的图形操作({'SentencepieceEncodeSparse'})也没有出现在 c++ 注册表中
- asp.net-core-mvc - 如何为 Asp.Net Core 2.2 MVC Web 应用程序进行基于 JWT 令牌的身份验证?
- php - ListView 中 Yii 2 分页的问题
- flask - Dockerfile 无法在 Windows 10 上运行 manage.py
- mongodb - 如何修改猫鼬中的现有模式?
- laravel - 从 env 访问配置变量
- excel - Excel 宏 VBA 无法读取 windows 10 环境变量
- android-studio - 带有波浪号的源文件夹名称从 Android Studio 上的构建中排除
- java - CucumberException:在运行测试时无法实例化类 stepDefinitions