首页 > 解决方案 > 如何调用 Sinks.Many.tryEmitNext 来自多个线程?

问题描述

我在 Flux Sinks 周围徘徊,无法理解更高层次的画面。使用 时Sinks.Many<T> tryEmitNext,该函数会告诉我是否存在争用以及发生故障时应该如何处理(FailFast/Handler)。

但是有没有一个简单的结构可以让我安全地从多个线程发出元素。例如,与其让用户知道存在争用并且我应该再试一次,不如将元素添加到队列(mpmc、mpsc 等),并且仅在队列已满时通知。

现在我可以自己添加一个队列来缓解这个问题,但这似乎是一个常见的用例。我想我在这里遗漏了一点。

标签: project-reactor

解决方案


我遇到了同样的问题,从支持多线程安全发射的处理器迁移。按照 EmitFailureHandler 文档的建议,我使用此自定义 EmitFailureHandler 执行繁忙循环。

public static EmitFailureHandler etryOnNonSerializedElse(EmitFailureHandler fallback){
    return (signalType, emitResult) -> {
        if (emitResult == EmitResult.FAIL_NON_SERIALIZED) {
            LockSupport.parkNanos(10);
            return true;
        } else
            return fallback.onEmitFailure(signalType, emitResult);
    };
}

关于 3.4.0 实现有很多令人困惑的方面

  • 这意味着除非使用 Unsafe 变体,否则接收器支持序列化发射,但实际上所有序列化版本所做的只是在并发发射的情况下快速失败。
  • Flux.Create 提供的 Sink 确实支持线程安全发射。

我希望图书馆在某个时候会提供一个经过精心设计的替代方案。


推荐阅读