scala - 如何使用 Lagom Kafka Message Broker API 安全地跳过消息?
问题描述
我们定义了一个基本订阅者,它通过抛出异常并依靠 Akka Streams 的流监督来恢复失败的消息(即出于某些业务逻辑原因,我们不会处理)Flow
:
someLagomService
.someTopic()
.subscribe
.withGroupId("lagom-service")
.atLeastOnce(
Flow[Int]
.mapAsync(1)(el => {
// Exception may occur here or can map to Done
})
.withAttributes(ActorAttributes.supervisionStrategy({
case t =>
Supervision.Resume
})
)
这似乎适用于负载非常小的基本用例,但我们注意到对于大量消息非常奇怪的事情(例如:非常频繁地重新处理消息等)。
深入研究代码,我们看到 Lagom 的broker.Subscriber.atLeastOnce
文档指出:
可能会从上游提取更多元素,但它必须为接收到的每条消息
flow
发出一条消息。Done
它还必须以与接收消息相同的顺序发出它们。这意味着flow
不能过滤或收集消息的子集,而是必须将消息拆分为单独的流并将那些将被删除的流映射到Done
.
此外,在 Lagom 的 impl 中KafkaSubscriberActor
,我们看到 private 的 implatLeastOnce
本质上解压缩了消息有效负载和偏移量,然后在我们的用户流将消息映射到Done
.
上面这两个花絮似乎暗示,通过使用流管理器和跳过元素,我们最终可能会出现可提交偏移量不再与Done
每个 Kafka 消息生成的 s 均匀压缩的情况。
示例:如果我们流式传输 1、2、3、4 并将 1、2 和 4 映射到Done
但在 3 上抛出异常,我们有 3 Done
s 和 4 个可提交偏移量?
- 这是正确的/预期的吗?这是否意味着我们应该避免在这里使用流监督器?
- 不均匀的拉链会导致什么样的行为?
- 在通过 Lagom 消息代理 API 使用来自 Kafka 的消息时,推荐的错误处理方法是什么?映射/恢复故障是否正确
Done
?
使用 Lagom 1.4.10
解决方案
这是正确的/预期的吗?这是否意味着我们应该避免在这里使用流监督器?
官方API 文档说
如果正在使用 Kafka Lagom 消息代理模块,则默认情况下,当发生故障时流会自动重新启动。
因此,无需添加您自己的supervisionStrategy
来管理错误处理。并且默认情况下将重新启动流,您不应考虑“跳过”完成消息。
不均匀的拉链会导致什么样的行为?
正是因为如此,文档说:
这意味着流不得过滤或收集消息的子集
它可能会低估错误的偏移量。并且在重新启动时,您可能会从提交的较低偏移量中以重播的形式获得已处理的消息。
在通过 Lagom 消息代理 API 使用来自 Kafka 的消息时,推荐的错误处理方法是什么?将故障映射/恢复到 Done 是否正确?
Lagom 通过删除导致错误的消息并重新启动流来处理异常。并且映射/恢复失败到完成不会对此有任何改变。
您可以考虑,如果您以后需要访问这些消息,也可以使用Try {}
例如,即不抛出异常,并通过将错误消息发送到不同的主题来收集它们,这将使您有机会监控当条件正确时导致错误的错误和重播消息的数量,即错误已修复。
推荐阅读
- c# - 我想在 Redis 中存储 SignalR 连接 ID
- javascript - 材料设计列表元素到卡片的过渡
- php - 使用 axios 从 reactJS 将数据发送到 php 文件
- javascript - 基于另一个数组键和方向(左或右)的数组排序逻辑
- javascript - 运行 gulp 默认命令时发生 Gulp 错误
- json - 赛普拉斯:错误插入 json => 特殊字符序列:
- google-apps-script - setRichTextValue 到数组中的元素而不是一次一个单元格?
- json - Scala Play:列表到 Json-Array
- graph - 我在 neo4j 上得到不同的图表。如何获得 1-1 图表
- android - android.content.pm.PackageManager$NameNotFoundException: com.android.chrome