首页 > 解决方案 > 在 reactor Java 中顺序执行 Reactive 任​​务

问题描述

我正在努力将阻塞顺序编排框架转换为反应式。目前,这些任务是动态的,并通过 JSON 输入输入引擎。引擎提取类并执行该run()方法,并将状态与每个任务的响应一起保存。
如何在反应器中实现相同的链接?如果这是一个静态 DAG,我会用flatMaporthen运算符链接它,但由于它是动态的,我如何继续执行响应式任务并收集每个任务的输出?

示例:
非反应接口:

public interface OrchestrationTask {
 OrchestrationContext run(IngestionContext ctx);
}

核心引擎

public Status executeDAG(String id) {
  IngestionContext ctx = ContextBuilder.getCtx(id);
  List<OrchestrationTask> tasks = app.getEligibleTasks(id);

  for(OrchestrationTask task : tasks) {
    // Eligible tasks are executed sequentially and results are collected.
    OrchestrationContext stepContext = task.run(ctx);
    if(!evaluateResult(stepContext)) break;
  }
  return Status.SUCCESS;
}

按照上面的例子,如果我将任务转换为返回 Mono<?> 那么,我如何等待或链接其他任务以对先前任务的结果进行操作?任何帮助表示赞赏。谢谢。

更新::

反应式任务示例。

public class SampleTask implements OrchestrationTask {  
  @Override
  public Mono<OrchestrationContext> run(OrchestrationContext context) {  
  // Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
  return Mono.just(context).delayElements(Duration.ofSeconds(2));
 }

因此,我将有一系列任务来完成各种事情,但每个任务的响应都取决于前一个任务并存储在编排上下文中。每当发生错误时,编排上下文标志将设置为 false 并且通量应该停止。

标签: spring-bootspring-webfluxreactor

解决方案


我们当然可以:

  • 从任务列表创建通量(如果适合以反应方式生成任务列表,那么您可以直接用通量替换该数组列表,如果不是,则保持原样);
  • flatMap()task.run()您方法的每个任务(根据问题现在返回一个Mono;
  • 确保我们只在evaluateResult()为真时消费元素;
  • ...然后最后SUCCESS像以前一样返回状态。

因此,将所有这些放在一起,只需将循环和返回语句替换为:

Flux.fromIterable(tasks)
        .flatMap(task -> task.run(ctx))
        .takeWhile(stepContext -> evaluateResult(stepContext))
        .then(Mono.just(Status.SUCCESS));

(由于我们已经使它成为反应式,您的方法显然需要返回 aMono<Status>而不仅仅是返回Status。)

根据评论更新 - 如果您只是希望它“一次执行一个”而不是同时执行多个,您可以使用concatMap()而不是flatMap().


推荐阅读