首页 > 解决方案 > 如何在 redux-saga 中以有限的并发和合理的取消实现批处理任务?

问题描述

我正在尝试通过 redux-saga 实现图像上传。我需要包括的功能是:

我得到的代码如下。我的问题是处理程序是在重新启动 sagacancelAll的块之后执行的,而且我似乎需要重新启动一切。finally它看起来笨重且容易出错。你能就这是否是 Sagas 的使用方式提供任何建议吗?

function* uploadImage(file) {
  const config = yield getConfig();
  const getRequest = new SagaRequest();
  console.log("Making async request here.");
}

function* consumeImages(uploadRequestsChannel) {
  while (true) {
    const fileAdded = yield take(uploadRequestsChannel);
    // process the request
    yield* uploadImage(fileAdded);
  }
}

function* uploadImagesSaga() {
  const CONCURRENT_UPLOADS = 10;
  const uploadRequestsChannel = yield call(channel);
  let workers = [];
  function* scheduleWorkers() {
    workers = [];
    for (let i = 0; i < CONCURRENT_UPLOADS; i++) {
      const worker = yield fork(consumeImages, uploadRequestsChannel);
      workers.push(worker);
    }
  }

  let listener;
  yield* scheduleWorkers();

  function* cancelAll() {
    // cancel producer and consumers, flush channel
    yield cancel(listener);
    for (const worker of workers) {
      yield cancel(worker);
    }
    yield flush(uploadRequestsChannel);
  }

  function* putToChannel(chan, task) {
    return yield put(chan, task);
  }

  function* listenToUploads() {
    try {
      while (true) {
        const { filesAdded } = yield take(START_UPLOADS);
        for (const fileAdded of filesAdded) {
          yield fork(putToChannel, uploadRequestsChannel, fileAdded);
        }
      }
    } finally {
      // if cancelled, restart consumers and producer
      yield* scheduleWorkers();
      listener = yield fork(listenToUploads);
    }
  }

  listener = yield fork(listenToUploads);

  while (true) {
    yield take(CANCEL_ACTION);
    yield call(cancelAll);
  }
}

export default uploadImagesSaga;

编辑:在这里提炼成一个沙箱:https ://codesandbox.io/s/cancellable-counter-example-qomw6

标签: javascriptreactjsasynchronousreact-reduxredux-saga

解决方案


我喜欢使用race取消 - 比赛的解决值是一个具有一个键和值的对象(“获胜”任务)。redux-saga race() 文档

const result = yield race({
  cancel: take(CANCEL_ACTION),
  listener: call(listenToUploads), // use blocking `call`, not fork
});

if (result.cancel) {
  yield call(cancelAll)
}

^ 这可以包装在一个while (true)循环中,因此您应该能够合并原始示例中重复的 fork()。如果工人需要重新安排,您可以考虑在内部处理cancelAll

我更喜欢让外部任务句柄重新启动,而不是从它们自己的finally块中调用任务。

编辑:重构示例沙箱https://codesandbox.io/s/cancellable-counter-example-j5vxr


推荐阅读