project-reactor - 如何调用 Sinks.Many.tryEmitNext 来自多个线程?
问题描述
我在 Flux Sinks 周围徘徊,无法理解更高层次的画面。使用 时Sinks.Many<T> tryEmitNext
,该函数会告诉我是否存在争用以及发生故障时应该如何处理(FailFast/Handler)。
但是有没有一个简单的结构可以让我安全地从多个线程发出元素。例如,与其让用户知道存在争用并且我应该再试一次,不如将元素添加到队列(mpmc、mpsc 等),并且仅在队列已满时通知。
现在我可以自己添加一个队列来缓解这个问题,但这似乎是一个常见的用例。我想我在这里遗漏了一点。
解决方案
我遇到了同样的问题,从支持多线程安全发射的处理器迁移。按照 EmitFailureHandler 文档的建议,我使用此自定义 EmitFailureHandler 执行繁忙循环。
public static EmitFailureHandler etryOnNonSerializedElse(EmitFailureHandler fallback){
return (signalType, emitResult) -> {
if (emitResult == EmitResult.FAIL_NON_SERIALIZED) {
LockSupport.parkNanos(10);
return true;
} else
return fallback.onEmitFailure(signalType, emitResult);
};
}
关于 3.4.0 实现有很多令人困惑的方面
- 这意味着除非使用 Unsafe 变体,否则接收器支持序列化发射,但实际上所有序列化版本所做的只是在并发发射的情况下快速失败。
- Flux.Create 提供的 Sink 确实支持线程安全发射。
我希望图书馆在某个时候会提供一个经过精心设计的替代方案。
推荐阅读
- javascript - 在 javascript 中创建 csv 文件时,我无法让我的 åäö 显示在文件中
- flutter - 为什么 Gridview.count 没有用我的动态内容填充空间
- delphi - Tcxgriddbtableview - 对某一行中的几列求和
- tableau-api - 表格中的计算字段取决于列中的值
- emacs - 从组织模式表创建蜘蛛图
- javascript - 基于数量的 Woocommerce 变化价格产品总数
- json - 如何处理通过 JSON 传递的映射有意的 NULL 值?
- methods - 如何判断一个类方法是否存在
- node.js - sendSignedTransaction:Quorum 上的无效发件人错误
- mapstruct - 将 com.vividsolutions.jts.geom.Point 转换为 Dto