kotlin - 如何根据项目反应器中的当前可观察值动态选择变换函数?
问题描述
你好亲爱的反应式程序员,我开始学习项目反应器,但我仍然很难弄清楚什么时候使用什么操作符。我发现,如果我想使用可重复使用的部件来定义反应器流,我可以使用转换运算符。我想要实现的是基于当前的 observables 上下文使用这种流函数的某种实现。对于 Mono 流程,我想出了这个,但我非常不确定,如果它是一个好的解决方案:
所以这是流程的一部分
class CloudeventOverDelegatorRoute(
val fromHttpToDelegatorRoute: FromHttpToDelegatorRoute,
val delegatorProvider: DelegatorProvider,
val fromDelegatorToHttpRoute: FromDelegatorToHttpRoute
): MessageRoute<HttpBaseMessage, HttpResponseMessage> {
override fun isHandlerFor(context: RouteContext): Boolean {
return fromHttpToDelegatorRoute.isHandlerFor(context)
&& fromDelegatorToHttpRoute.isHandlerFor(context)
}
override fun buildPipeline(input: Mono<RoutableMessage<HttpBaseMessage>>): Mono<RoutableMessage<HttpResponseMessage>> {
var dynamicallyDeterminedDelegator: Delegator? = null
return input.transform {
fromHttpToDelegatorRoute.buildPipeline(input)
}.handle<RoutableMessage<InternalMessage>> { t, u ->
dynamicallyDeterminedDelegator = delegatorProvider.provideDelegatorFor(t.routeContext)
u.next(t)
u.complete()
}.transform {
dynamicallyDeterminedDelegator!!.sendDelegated(it)
}.transform { fromDelegatorToHttpRoute.buildPipeline(it) }
}
}
这是动态选择逻辑
interface DelegatorProvider {
fun provideDelegatorFor(context: RouteContext): Delegator
}
class FirstMatchDelegatorProvider(
private val delegators: List<Delegator>
): DelegatorProvider {
override fun provideDelegatorFor(context: RouteContext): Delegator {
return delegators.firstOrNull {
it.isHandlerFor(context)
}?: throw IllegalStateException("No Delegator route available for context: $context")
}
}
这是委托人提供整个流程的重要子部分
interface Delegator {
fun isHandlerFor(context: RouteContext): Boolean
fun sendDelegated(input: Mono<RoutableMessage<InternalMessage>>): Mono<RoutableMessage<InternalStatusMessage>>
}
你怎么看?你会怎么解决?
解决方案
这种方法是有问题的,因为它依赖于共享状态(dynamicallyDeterminedDelegator
变量)。如果多个订阅者订阅返回的Mono
,他们可以覆盖彼此的委托人。也许(多个订阅)不会在您的应用程序中发生,但无论如何这是一个非常糟糕的习惯。
看起来您可以从 a 中得出delegator
a RoutableMessage<InternalMessage>
,并且您实际上并不需要保留该委托人。
一次性解决委托人并将其应用于 routeableMessage 的最简单方法是简单地使用flatMap
. 请参阅下面的(伪)java代码:
.flatMap(routableMessage -> {
val delegator = delegatorProvider.provideDelegatorFor(routableMessage.routeContext);
return delegator.sendDelegated(routableMessage);
})
推荐阅读
- apache-kafka-streams - 从多个用户线程访问 ProcessContext::forward
- android - 更改 ImageView 的背景很慢
- wxpython - 如何使用 wxpython 中的 GUI 在 pybullet 中启动物理模拟
- angular - Angular 的安全性 Kendo UI
- reactjs - 如何让我的 css 模块在 react.js 中正常工作?
- if-statement - 当条件为真时将特定元素添加到列表
- javascript - 发布请求 api 反应后重新渲染组件
- scala - Spark中Dataframe(GraphFrame)中的深度优先搜索算法
- javascript - Javascript:从 body-tag 读取样式表字体大小
- django - 如何将多个模型组合成一个序列化程序?