首页 > 解决方案 > 如何使每个消息处理成功?

问题描述

下面是一个包含 3 个 Go 例程的服务,用于处理来自 Kafka 的消息:

在此处输入图像描述


Channel-1 和 Channel-2 是 Go 中的无缓冲数据通道。通道就像一个排队机制。

Goroutine-1 从 kafka 主题读取消息,在消息验证后将其消息负载扔到 Channel-1。

Goroutine-2 从 Channel-1 读取并处理有效载荷并将处理后的有效载荷扔到 Channel-2 上。

Goroutine-3 从 Channel-2 读取数据并将处理后的 payload 封装成 http 数据包,并执行 http 请求(使用 http 客户端)到另一个服务。

上述流程中的漏洞:在我们的例子中,由于服务之间的网络连接不良或远程服务尚未准备好接受来自 Go-routine3(http 客户端超时)的 http 请求,处理失败,因此,上述服务丢失了该消息(已从 Kafka 主题中读取)。


Goroutine-1 当前从 Kafka 订阅消息,但没有向 Kafka 发送确认消息(通知特定消息已被 Goroutine-3 成功处理)

正确性优于性能。


如何保证每条消息都处理成功?

标签: goapache-kafkamessage-queue

解决方案


为确保正确性,您需要在处理成功完成后提交(=确认)消息。
对于处理未成功完成的情况——一般情况下,您需要自己实现重试机制。
这应该特定于您的用例,但通常您将消息扔回专用的 Kafka 重试主题(您创建),添加睡眠并再次处理消息。如果在 x 次后处理失败 - 您将消息扔到 DLQ(=死信队列)。
你可以在这里阅读更多内容:
https ://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/


推荐阅读