apache-kafka - 何时使用 Kafka 事务 API?
问题描述
我试图了解 Kafka 的事务 API。此链接定义原子读-处理-写周期如下:
首先,让我们考虑一下原子读-进程-写周期的含义。简而言之,这意味着如果应用程序在某个主题分区 tp0 的偏移量 X 处消费了消息 A,并在对消息 A 进行了一些处理使得 B = F(A) 之后将消息 B 写入主题分区 tp1,那么仅当消息 A 和 B 被视为成功使用并一起发布或根本不一起发布时,读取-过程-写入周期才是原子的。
它进一步说如下:
使用为至少一次交付语义配置的 vanilla Kafka 生产者和消费者,流处理应用程序可能会以下列方式丢失恰好一次处理语义:
由于内部重试,producer.send() 可能导致消息 B 的重复写入。这是由幂等生产者解决的,不是本文其余部分的重点。
我们可能会重新处理输入消息 A,导致重复的 B 消息被写入输出,从而违反了仅处理一次的语义。如果流处理应用程序在写入 B 之后但在将 A 标记为已使用之前崩溃,则可能会发生重新处理。因此,当它恢复时,它将再次消耗 A 并再次写入 B,从而导致重复。
最后,在分布式环境中,应用程序将崩溃,或者——更糟!——暂时失去与系统其余部分的连接。通常,新实例会自动启动以替换那些被认为丢失的实例。通过这个过程,我们可能有多个实例处理相同的输入主题并写入相同的输出主题,从而导致重复输出并违反恰好一次处理的语义。我们称之为“僵尸实例”问题。</p>
我们在 Kafka 中设计了事务 API 来解决第二个和第三个问题。事务通过使这些周期原子化并促进僵尸防护,从而在读-进程-写周期中实现精确一次处理。
疑点:
上面的第 2 点和第 3 点描述了何时可能发生使用事务 API 处理的消息重复。事务 API 是否也有助于在任何情况下避免消息丢失?
Kafka 事务 API 的大多数在线(例如,此处和此处)示例涉及:
while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); producer.beginTransaction(); for (ConsumerRecord record : records) producer.send(producerRecord(“outputTopic”, record)); producer.sendOffsetsToTransaction(currentOffsets(consumer), group); producer.commitTransaction(); }
这基本上是读-处理-写循环。那么事务性 API 是否仅在读-处理-写循环中有用?
本文给出了非读写场景下的事务 API 示例:
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch(ProducerFencedException e) { producer.close(); } catch(KafkaException e) { producer.abortTransaction(); }
它说:
这允许生产者将一批消息发送到多个分区,以便批处理中的所有消息最终对任何消费者可见,或者对消费者不可见。
这个例子是否正确并展示了另一种使用不同于读-处理-写循环的事务 API 的方法?(请注意,它也不会向事务提交偏移量。)
在我的应用程序中,我只是使用来自 kafka 的消息,进行处理并将它们记录到数据库中。那是我的整个管道。
一个。所以,我猜这不是read-process-write cycle。Kafka 事务 API 对我的场景有用吗?
湾。我还需要确保每条消息只处理一次。我想
idempotent=true
在生产者中设置就足够了,我不需要事务 API,对吧?C。我可能会运行多个管道实例,但我不会将处理输出写入 Kafka。所以我想这永远不会涉及僵尸(重复的生产者写信给卡夫卡)。所以,我想事务 API 不会帮助我避免重复处理场景,对吧?(我可能必须在同一个数据库事务中将偏移量和处理输出保存到数据库中,并在生产者重启期间读取偏移量以避免重复处理。)
解决方案
一个。所以,我猜这不是读-处理-写循环。Kafka 事务 API 对我的场景有用吗?
它是一个读-过程-写,除了您正在写入数据库而不是 Kafka。Kafka 有自己的事务管理器,因此在具有幂等性的事务中写入将启用一次处理,假设您可以正确恢复消费者写入处理器的状态。您不能对数据库执行此操作,因为数据库的事务管理器不与 Kafka 同步。相反,您可以做的是确保即使 kafka 事务相对于您的数据库不是原子的,它们最终仍然是一致的。
让我们假设您的消费者读取、写入数据库然后确认。如果数据库失败,您不确认,您可以根据偏移量正常恢复。如果 ack 失败,您将处理两次并保存到数据库两次。如果您可以使此操作具有幂等性,那么您就是安全的。这意味着您的处理器必须是纯处理器并且数据库必须进行重复数据删除:两次处理相同的消息应该始终在数据库上产生相同的结果。
湾。我还需要确保每条消息只处理一次。我想在 producer 中设置 idempotent=true 就足够了,我不需要事务 API,对吧?
假设您遵守第 a 点的要求,那么在不同存储上进行持久化处理后,还要求在您的初始写入和复制之间,您正在保存的对象没有发生任何其他更改。想象有一个值写为 X,然后其他一些参与者将其更改为 Y,然后消息被重新处理并将其更改回 X。例如,可以通过将数据库表设置为日志,类似于 kafka 主题来避免这种情况.
C。我可能会运行多个管道实例,但我不会将处理输出写入 Kafka。所以我想这永远不会涉及僵尸(重复的生产者写信给卡夫卡)。所以,我想事务 API 不会帮助我避免重复处理场景,对吧?(我可能必须在同一个数据库事务中将偏移量和处理输出保存到数据库中,并在生产者重启期间读取偏移量以避免重复处理。)
写入您从中消费的主题的生产者可能会创建僵尸消息。那个制作人需要和kafka好好相处,这样僵尸才会被忽略。事务性 API 和您的消费者将确保该生产者以原子方式写入,而您的消费者读取已提交的消息,尽管不是原子方式。如果你只想要一次幂等性就足够了。如果消息应该是原子写入的,那么您也需要事务。无论哪种方式,您的读写/消费-生产处理器都必须是纯的,并且您必须进行重复数据删除。您的数据库也是该处理器的一部分,因为数据库是实际存在的数据库。
我在网上找了一下,也许这个链接对你有帮助:处理保证
推荐阅读
- javascript - 类型错误:top20 不可迭代
- javascript - 尝试使用 getDownloadUrl 时,列出所有使用 firebase 存储的文件给我 404
- python - 这个 lambda 如何反转字符串?
- lisp - 为什么“让”添加换行符?(我可以摆脱它吗?)
- python - 为什么 pandas excel writer 不会写入我的文件路径?
- python - 通过在 python 中使用它的字符串名称创建一个类的实例
- swift - 访问最近存储的文件(文件“插入文件”无法打开,因为没有这样的文件)
- php - 如何为每个在 Apache 上注册 PHP 的用户添加子域?
- python - Python - 多处理从子目录读取文件并创建 csv 文件列表输出
- google-cloud-platform - 在 GCP 中的组织内共享自定义图像