akka-stream - 如何缓冲和删除带有分隔符的分块字节串?
问题描述
假设您有一个发布者使用广播和一些快速和一些慢速订阅者,并且希望能够为慢速订阅者删除一组消息,而不必将它们保存在内存中。数据由分块的 ByteString 组成,因此不能选择删除单个 ByteString。
每组 ByteStrings 后跟一个终止符 ByteString("\n"),所以我需要删除一组以它结尾的 ByteStrings。
这是您可以使用自定义图形阶段做的事情吗?可以在不聚合并将整个集合保存在内存中的情况下完成吗?
解决方案
避免自定义阶段
只要有可能尽量避免自定义阶段,它们很难正确且非常冗长。通常标准akka-stream阶段和plain-old-functions的一些组合可以解决问题。
组丢弃
大概您有一些标准可用于决定将丢弃哪组消息:
type ShouldDropTester : () => Boolean
出于演示目的,我将使用一个简单的开关来丢弃所有其他组:
val dropEveryOther : ShouldDropTester =
Iterator.from(1)
.map(_ % 2 == 0)
.next
我们还需要一个函数来接收 aShouldDropTester
并使用它来确定是否ByteString
应该删除一个人:
val endOfFile = ByteString("\n")
val dropGroupPredicate : ShouldDropTester => ByteString => Boolean =
(shouldDropTester) => {
var dropGroup = shouldDropTester()
(byteString) =>
if(byteString equals endOfFile) {
val returnValue = dropGroup
dropGroup = shouldDropTester()
returnValue
}
else {
dropGroup
}
}
结合上述两个函数将删除每隔一组的 ByteStrings。然后可以将此功能转换为Flow
:
val filterPredicateFunction : ByteString => Boolean =
dropGroupPredicate(dropEveryOther)
val dropGroups : Flow[ByteString, ByteString, _] =
Flow[ByteString] filter filterPredicateFunction
根据需要:消息组不需要缓冲,谓词将在单个 ByteStrings 上工作,因此无论文件大小如何都会消耗恒定数量的内存。
推荐阅读
- angular - Angular 7 从 Angular 库中导入原始扩展
- flutter - 如何在小部件树中保留块实例
- azure - 未找到应用程序依赖项清单中指定的程序集
- multithreading - AsyncHttpClient 创建了多少线程?
- javascript - jQuery/Ajax 帖子中的表单提交不会第二次触发
- java - 在实例化阶段之前/期间检索 Bean 类
- python - Python 是否有类似于 PowerShell 的通配符?
- qt - Yocto:QtMultimedia 不构建 qtmultimedia-plugin
- python - 列表索引超出范围错误仅在在线抓取时发生
- wordpress - 显示产品的 AJAX 问题按类别过滤产品