首页 > 解决方案 > 如何将上下文传播到 Quarkus 中的并行流操作?

问题描述

我有一系列简单的链式操作,它们使用在 Quarkus 服务中运行的 Panache 存储库检索和保存一些数据。在这些操作被并行化的地方ContextNotActiveException会抛出一个。在删除并行化的地方,代码按预期工作。

此代码有效:

    dataRepository.get()
        .map { convert(it) }
        .forEach { perist(it) }

此代码不会:

    dataRepository.get()
        .parallelStream()
        .map { convert(it) }
        .forEach { perist(it) }

Quarkus 文档非常有限,仅解决了 mutiny 或 RX 的使用。

我怎样才能传播这样的上下文parallelStream()

标签: quarkus

解决方案


不幸的是,Context Propagation 不能很好地处理并行 Java 流,因为使流并行化会自动将执行移动到 ForkJoinPool,这意味着您会丢失上下文。您需要以不同的方式处理并行性,而不是让 Java 流为您完成 - 您可能希望使用org.eclipse.microprofile.context.ManagedExecutor.

假设它是convert某种方法,无论出于何种原因,都需要一个活动的请求上下文,您需要将其调用分派到托管执行程序中。这将确保传播上下文。在 Java 代码中,我能想到的与您的代码相当的一个是:

    @Inject
    org.eclipse.microprofile.context.ManagedExecutor executor;

(...)

dataRepository.streamAll()
                .forEach(i -> {
                    executor.supplyAsync(() -> {
                        return convert(i);
                    }).thenAccept(persist(i));
                });

推荐阅读