首页 > 解决方案 > 如何缓冲和删除带有分隔符的分块字节串?

问题描述

假设您有一个发布者使用广播和一些快速和一些慢速订阅者,并且希望能够为慢速订阅者删除一组消息,而不必将它们保存在内存中。数据由分块的 ByteString 组成,因此不能选择删除单个 ByteString。

每组 ByteStrings 后跟一个终止符 ByteString("\n"),所以我需要删除一组以它结尾的 ByteStrings。

这是您可以使用自定义图形阶段做的事情吗?可以在不聚合并将整个集合保存在内存中的情况下完成吗?

标签: akka-stream

解决方案


避免自定义阶段

只要有可能尽量避免自定义阶段,它们很难正确且非常冗长。通常标准akka-stream阶段和plain-old-functions的一些组合可以解决问题。

组丢弃

大概您有一些标准可用于决定将丢弃哪组消息:

type ShouldDropTester : () => Boolean

出于演示目的,我将使用一个简单的开关来丢弃所有其他组:

val dropEveryOther : ShouldDropTester = 
  Iterator.from(1)
          .map(_ % 2 == 0)
          .next

我们还需要一个函数来接收 aShouldDropTester并使用它来确定是否ByteString应该删除一个人:

val endOfFile = ByteString("\n")

val dropGroupPredicate : ShouldDropTester => ByteString => Boolean = 
  (shouldDropTester) => {
    var dropGroup = shouldDropTester()

    (byteString) => 
      if(byteString equals endOfFile) {
        val returnValue = dropGroup
        dropGroup = shouldDropTester()
        returnValue
      }
      else {
        dropGroup
      }      
  }

结合上述两个函数将删除每隔一组的 ByteStrings。然后可以将此功能转换为Flow

val filterPredicateFunction : ByteString => Boolean =
  dropGroupPredicate(dropEveryOther)

val dropGroups : Flow[ByteString, ByteString, _] =
  Flow[ByteString] filter filterPredicateFunction

根据需要:消息组不需要缓冲,谓词将在单个 ByteStrings 上工作,因此无论文件大小如何都会消耗恒定数量的内存。


推荐阅读