首页 > 解决方案 > Apache Beam KinesisIO Java 处理管道 - 应用程序状态、错误处理和容错?

问题描述

我正在开发我的第一个 Apache 光束管道来处理来自 AWS Kinesis 的数据流。我熟悉 Kafka 的概念,了解它如何处理消费者的偏移/状态,并且在实现 apachestorm/spark 处理方面有经验。

在阅读完文档后,我成功地使用 KinesisIO Java SDK 创建了一个工作梁管道,以监听 AWS Kinesis 数据流以转换和打印消息。但是,想知道有关如何在 apache beam wrt KinesisIO 中处理以下区域的任何参考实现或指针-

  1. 如何在 Kinesis 流中唯一标识消费者应用程序(类似于 Kafka 中的消费者组 ID)——我说得对吗?它基于 apache Beam 的应用程序名称,并且任何使用 KCL 的消费者都会在 DynamoDB 中跟踪其状态;总是如此吗? Apache Beam KinesisIO 也是如此吗?

  2. 如何强制消费者从之前中断的地方开始处理数据流,即在消费者重新启动或处理中出现任何错误异常的情况下(类似于 Kakfa 中每个消费者 groupId 的偏移量管理)。InitialPositionInStream.TRIM_HORIZON 始终从最早的可用数据流开始,即使我在处理来自 Kinesis 流的少量数据后重新启动管道。

  3. ack 如何在 Kinesis 数据流中工作,即消费者如何确认/更新使用 getRecords() 提取的数据流在进一步增加分片中的序列/位置之前处理的检查点?有什么方法可以控制消费者应用程序中的这些行为,以了解何时成功确认消息以保存应用程序状态并在消费者重新启动时从这些位置开始?

  4. 处理数据流时业务异常(在管道中的任何阶段)对从 Kinesis 流中提取的后续数据的影响,即应用程序是继续提取数据还是停止流程。

标签: javaapache-beamamazon-kinesisapache-beam-io

解决方案


  1. KinesisIO.Read在后台利用 AWS SDK 从 Kinesis 读取数据,并定期检索 Shard Iterator 的更新以从 Kinesis 分片中获取记录。

  2. 你试过ShardIteratorType#LATEST吗?

  3. 在这里查看我的答案:https ://stackoverflow.com/a/62349838/10687325

  4. 如果是未知异常,则管道将停止。


推荐阅读