首页 > 解决方案 > 播放框架 Scala:使用 scala akka 流创建无限源并保持服务器发送事件连接在服务器上打开

问题描述

我们需要为以下用例实现服务器发送的事件:

  1. 在服务器上进行一些处理后向 UI 发送通知。这个处理是基于一些逻辑的
  2. 从 RabbitMQ 读取消息后向 UI 发送通知,然后对其执行一些操作。

我们使用 Scala(2.11/2.12)和 Play 框架(2.6.x)的技术集。库:akka.stream.scaladsl.Source

我们从以下示例开始我们的概念证明https://github.com/playframework/play-scala-streaming-example,然后我们通过创建不同的源进行扩展。我们尝试使用 Source.apply,soure.single 创建源代码。

但是,一旦源中的所有元素都被推送到 UI,我的事件流就关闭了。但我不希望事件流关闭。另外我不想使用一些计时器(Source.tick)或Source.repeat。

创建我的源时,集合假设有一些 x 元素,然后服务添加了另外 4 个元素。但是在 x 个元素之后,事件流被关闭然后再次重新打开。

有什么办法我的事件流可以是无限的并且只有我的会话被注销或者我们可以明确地关闭它才会关闭。

// KeepAlive 的代码(在评论中询问)

   object NotficationUtil {

      var userNotificationMap = Map[Integer, Queue[String]]()

      def addUserNotification(userId: Integer, message: String) = {
        var queue = userNotificationMap.getOrElse(userId, Queue[String]())
        queue += message
        userNotificationMap.put(userId, queue)

      }

      def pushNotification(userId: Integer): Source[JsValue, _] = {
        var queue = userNotificationMap.getOrElse(userId, Queue[String]())
         Source.single(Json.toJson(queue.dequeueAll { x => true }))
      }
    }
    @Singleton
    class EventSourceController @Inject() (cc: ControllerComponents) extends AbstractController(cc) with FlowFactory{

      def pushNotifications(user_id:Integer) = Action {
      val stream = NotficationUtil.pushNotification(user_id)
       Ok.chunked(stream.keepAlive(50.second, ()=>Json.obj("data"->"heartbeat")) via EventSource.flow).as(ContentTypes.EVENT_STREAM)
     }

}

标签: scalaakka-streamserver-sent-eventsplayframework-2.6

解决方案


使用以下代码创建 actorref 和发布者

val (ref, sourcePublisher)= Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail).toMat(Sink.asPublisher(true))(Keep.both).run()

并从此发布者创建您的来源

val testsource = Source
      .fromPublisher[T](sourcePublisher)

并将您的听众注册为

Ok.chunked(
        testsource.keepAlive(
          50.seconds,
          () => Json.obj("data"->"heartbeat")) via EventSource.flow)
      .as(ContentTypes.EVENT_STREAM)
      .withHeaders("X-Accel-Buffering" -> "no", "Cache-Control" -> "no-cache")

将您的 json 数据发送到 ref actor,数据将作为事件流通过此源流向前端。希望能帮助到你。


推荐阅读