首页 > 解决方案 > 如何根据项目反应器中的当前可观察值动态选择变换函数?

问题描述

你好亲爱的反应式程序员,我开始学习项目反应器,但我仍然很难弄清楚什么时候使用什么操作符。我发现,如果我想使用可重复使用的部件来定义反应器流,我可以使用转换运算符。我想要实现的是基于当前的 observables 上下文使用这种流函数的某种实现。对于 Mono 流程,我想出了这个,但我非常不确定,如果它是一个好的解决方案:

所以这是流程的一部分

class CloudeventOverDelegatorRoute(
  val fromHttpToDelegatorRoute: FromHttpToDelegatorRoute,
  val delegatorProvider: DelegatorProvider,
  val fromDelegatorToHttpRoute: FromDelegatorToHttpRoute
): MessageRoute<HttpBaseMessage, HttpResponseMessage> {

  override fun isHandlerFor(context: RouteContext): Boolean {
    return fromHttpToDelegatorRoute.isHandlerFor(context)
      && fromDelegatorToHttpRoute.isHandlerFor(context)
  }

  override fun buildPipeline(input: Mono<RoutableMessage<HttpBaseMessage>>): Mono<RoutableMessage<HttpResponseMessage>> {
    var dynamicallyDeterminedDelegator: Delegator? = null
    return input.transform {
      fromHttpToDelegatorRoute.buildPipeline(input)
    }.handle<RoutableMessage<InternalMessage>> { t, u ->
      dynamicallyDeterminedDelegator = delegatorProvider.provideDelegatorFor(t.routeContext)
      u.next(t)
      u.complete()
    }.transform {
      dynamicallyDeterminedDelegator!!.sendDelegated(it)
    }.transform { fromDelegatorToHttpRoute.buildPipeline(it) }
  }

}

这是动态选择逻辑

interface DelegatorProvider {

  fun provideDelegatorFor(context: RouteContext): Delegator

}

class FirstMatchDelegatorProvider(
  private val delegators: List<Delegator>
): DelegatorProvider {

  override fun provideDelegatorFor(context: RouteContext): Delegator {
    return delegators.firstOrNull {
      it.isHandlerFor(context)
    }?: throw IllegalStateException("No Delegator route available for context: $context")
  }

}

这是委托人提供整个流程的重要子部分

interface Delegator {

  fun isHandlerFor(context: RouteContext): Boolean

  fun sendDelegated(input: Mono<RoutableMessage<InternalMessage>>): Mono<RoutableMessage<InternalStatusMessage>>

}

你怎么看?你会怎么解决?

标签: kotlinreactive-programmingspring-webfluxproject-reactor

解决方案


这种方法是有问题的,因为它依赖于共享状态dynamicallyDeterminedDelegator变量)。如果多个订阅者订阅返回的Mono,他们可以覆盖彼此的委托人。也许(多个订阅)不会在您的应用程序中发生,但无论如何这是一个非常糟糕的习惯。

看起来您可以从 a 中得出delegatora RoutableMessage<InternalMessage>,并且您实际上并不需要保留该委托人。

一次性解决委托人并将其应用于 routeableMessage 的最简单方法是简单地使用flatMap. 请参阅下面的(伪)java代码:

.flatMap(routableMessage -> {
    val delegator = delegatorProvider.provideDelegatorFor(routableMessage.routeContext);
    return delegator.sendDelegated(routableMessage);
})

推荐阅读