scala - 从 Akka Http 中的源向元素发送元素
问题描述
我正在使用 Akka Http 和 Akka Streams 开发一个客户端-服务器应用程序。主要思想是服务器必须使用来自 Akka 流的 Source 来提供 http 响应。
问题是服务器在向客户端发送第一条消息之前积累了一些元素。但是,我需要服务器在源生成新元素后立即将元素发送到元素。
代码示例:
case class Example(id: Long, txt: String, number: Double)
object MyJsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
implicit val exampleFormat = jsonFormat3(Test)
}
class BatchIterator(batchSize: Int, numberOfBatches: Int, pause: FiniteDuration) extends Iterator[Array[Test]]{
val range = Range(0, batchSize*numberOfBatches).toIterator
val numberOfBatchesIter = Range(0, numberOfBatches).toIterator
override def hasNext: Boolean = range.hasNext
override def next(): Array[Test] = {
println(s"Sleeping for ${pause.toMillis} ms")
Thread.sleep(pause.toMillis)
println(s"Taking $batchSize elements")
Range(0, batchSize).map{ _ =>
val count = range.next()
Test(count, s"Text$count", count*0.5)
}.toArray
}
}
object Server extends App {
import MyJsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
.withFramingRenderer(
Flow[ByteString].intersperse(ByteString(System.lineSeparator))
)
implicit val system = ActorSystem("api")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
def fetchExamples(): Source[Array[Test], NotUsed] = Source.fromIterator(() => new BatchIterator(5, 5, 2 seconds))
val route =
path("example") {
complete(fetchExamples)
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 9090)
println("Server started at localhost:9090")
StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ ⇒ system.terminate())
}
然后,如果我执行:
curl --no-buffer localhost:9090/example
我同时获取所有元素,而不是每 2 秒接收一个元素。
关于如何“强制”服务器发送从源中出来的每个元素的任何想法?
解决方案
最后,我找到了解决方案。问题是源是同步的......所以解决方案就是调用函数async
complete(fetchExamples.async)
推荐阅读
- express - express socket.io 端口 3000 已在使用中
- typescript - 联盟类型 - 强大的结果
- ios - UIStackView 中的圆形按钮(iOS Swift)
- ios - 对变量使用 POP 时,类没有初始化器
- github - Github Enterprise 激活休眠用户
- angular - 如何在自动完成组件中设置滚动条的最大高度
- spring-boot - RabbitMQ DirectMessageListenerContainer 中的消费者 - Cloud Foundry 中的内存影响
- list - Unity3D:使用列表创建网格布局时出错
- php - 在 php 中使用 fb->get() 使用图形 api 获取 facebook 照片
- elasticsearch - 嵌套的 Elasticsearch 查询