首页 > 解决方案 > 如何将 akka.stream.javadsl.Source 转换为 java.util.concurrent.Flow.Publisher

问题描述

我正在尝试Test Compatability Kit (tck)响应式流,并且我自己测试了一些发布者,我想测试一个akka Source. 但为了做到这一点,我需要将 Source(或 Source + Processor(s))转换为java.util.concurrent.Flow.Publisher.

@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
    return new FlowPublisher((int) elements);  // <-- how to test an akka.Source?
}

class FlowPublisher implements java.util.concurrent.Flow.Publisher {
    ...

而且我在任何地方都找不到如何做到这一点。

是否有一些关于此的文档,或者有人知道答案?

标签: akka-streamjava.util.concurrentreactive-streams

解决方案


您可以将 akka-streams 转换Sourceorg.reactivestreams.Publisher(与java.util.concurrent.Flow根据reactive-streams相同):

implicit val sys: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()

val source = Source.single(1) // some random source
val publisher: Publisher[Int] = source.runWith(Sink.asPublisher(false))

您可以在官方 akka-streams 文档中找到详细信息 -与 Reactive Streams 集成


需要注意的是,akka-streams 有文档和一些用于测试其组件的工具包 -测试流。我会说最好使用 akka-streams 测试工具包测试 akka-streams 组件,因为您不需要对org.reactivestreams.*实体进行额外的转换,并且测试代码将更加干净、简单和可靠


推荐阅读