首页 > 解决方案 > 使用 rxjs 6 如何创建带有缓存的可取消 Web 请求管道,并限制同时发生的请求数量

问题描述

我希望有人能够指出我正确的方向,因为我正在努力结合并发和取消rxjs 中排队请求的能力。我将尝试在连续事件中解释这一点。假设我们有 observable A,它接收一个字符串数组。

事件:A 观察到:['dog', 'cat', 'elephant', 'tiger'] 下游检查字符串网络响应是否被缓存,如果它存在于缓存中则从缓存中获取它,如果没有则从缓存中请求它web 并使用 publishReplay / shareReplay 将 observable 保存到缓存中。一次发生 2 个网络请求的限制,因此它尝试从 api 获取“狗”和“猫”(此操作需要 2000 毫秒以上)。1000 毫秒后,A 观察到另一组值:['dog', 'rat', 'horse', 'rabbit']。

接下来应该发生以下情况。我不希望取消“狗”和“猫”请求,我希望他们完成他们的请求,但我想忽略第一个请求中的“大象”和“老虎”。一旦 'dog' 和 'cat' 从第二帧得到他们的响应'rat' 和 'horse' 应该从网络请求,最后一旦其中任何一个解析 'rabbit' 被请求。

这是我当前的代码。我尝试在 networkObservable 的 defer 和 from 之间切换,但行为不同,这也不是我想要的。


const cache = new Map();

// Fake Promise to fake a api request
function later(delay, value) {
  console.log('requesting', value);
  return new Promise(resolve => setTimeout(resolve, delay, value));
}


const testObservable = of(['dog', 'rat', 'horse', 'rabbit']).pipe(
  delay(1000),
  startWith(['dog', 'cat', 'elephant', 'tiger'])
);

testObservable.pipe(
  map(array => from(array).pipe(
    publish(arrayObservable => {
      const cachedObservable = arrayObservable.pipe(
        filter(id => cache.has(id)),
        flatMap(id => cache.get(id), 1)
      );
      const uncachedObservable = arrayObservable.pipe(
        filter(id => !cache.has(id)),
        flatMap(id => {
          const networkObservable = from(later(2000, id)).pipe(
            tap(e => console.log('response', e)),
            map(e => 'parsed: ' + e),
            tap(e => console.log('parsed', e)),
            publishReplay(1),
            refCount(),
            take(1)
          );
          cache.set(id, networkObservable);
          return networkObservable;
        }, 2)
      );
      return merge(cachedObservable, uncachedObservable);
    })
  )),
  switchAll()
)

这导致输出:

requesting dog
requesting cat
requesting rat
requesting horse
response dog
parsed parsed: dog
response rat
parsed parsed: rat
requesting rabbit
response horse
parsed parsed: horse
response rabbit
parsed parsed: rabbit

这接近想要的行为,但有一个明显的缺陷。Rat and horse 被请求并且不等待 dog 和 cat 在被处决之前解决。但是,“老虎”和“大象”已被妥善处理,因此功能正常。

我是否必须创建一个单独的主题来处理请求?

标签: javascriptrxjsswitchmapconcurrency-limits

解决方案


我试图为这个有趣的问题找到一个解决方案,至少就我所理解的而言。

起点testObservableArrays<string>. 这些数组中的每一个都string代表对后端服务的潜在请求。在每个给定时间,动态请求不能超过 2 个,因此必须有某种队列机制。为此,我使用 的concurrency参数mergeMap

这里的关键点是,每当一个新的 Array 由 发出时testObservable,任何与string包含在先前发出的数组中的 s 相关但尚未发送到远程服务的请求都应该停止。

所以我开始创建一个对象流,其中包含string远程服务调用的输入以及像这样的停止指示器

testObservable
  .pipe(
    mergeMap((a) => {
      i++;
      if (arrays[i - 1]) {
        arrays[i - 1].stop = true;
      }
      const thisArray = { stop: false, i };
      arrays[i] = thisArray;
      return from(a.map((_v) => ({ v: _v, ssTop: arrays[i] }))).pipe(
        mergeMap((d) => {
          // d is an object containing the parameter for the remote call and an object of type {stop: boolean, i: number}
          // for every string of every array a d is emitted
        }, 2)
      );
    })
  )

然后,对于每个d发出的,我可以实现一个逻辑,确保只有在stop标志未设置为true这样的情况下才执行对远程服务的调用

d.ssTop.stop
            ? NEVER
            : from(later(2000, d.v))

请注意,通过确保在发出第 th 数组后不进行与第 th 数组相关的调用,任何时候发出第 th 数组时,第 th 数组的stop标志i设置为 true 。i+1testObservableii+1

这可能是完整代码的样子

const cache = new Map();

// Fake Promise to fake a api request
function later(delay, value) {
  console.log("requesting", value);
  return new Promise((resolve) => setTimeout(resolve, delay, value));
}

const testObservable = of(["dog", "rat", "horse", "rabbit"]).pipe(
  delay(1000),
  startWith(["dog", "cat", "elephant", "tiger"])
);

let i = 0;
let arrays: { stop: boolean; i: number }[] = [];

testObservable
  .pipe(
    mergeMap((a) => {
      i++;
      if (arrays[i - 1]) {
        arrays[i - 1].stop = true;
      }
      const thisArray = { stop: false, i };
      arrays[i] = thisArray;
      return from(a.map((_v) => ({ v: _v, ssTop: arrays[i] }))).pipe(
        mergeMap((d) => {
          return d.ssTop.stop
            ? NEVER
            : cache[d.v]
            ? of(`${d.v} is the returned from cache}`)
            : from(later(2000, d.v)).pipe(
                map((v: any) => {
                  cache[v] = v;
                  return `${v} is the returned value ${d.ssTop.i}`;
                })
              );
        }, 2)
      );
    })
  )
  .subscribe({
    next: (d) => {
      console.log(d);
    },
  });

推荐阅读