java - AWS Kinesis 增强型扇出 Java 示例
问题描述
我有一个应用程序使用来自 Kinesis 流的记录并进一步处理它们,但性能非常低,所以现在我计划使用 KCL 2.x 迁移到 Kinesis 增强型扇出消费者以提高其性能。由于增强型扇出的 Aws Kinesis 文档非常令人困惑,有人可以帮我举一个例子来说明如何在我的 Java 应用程序中实现此使用者功能吗?
解决方案
这是一个非常详细的 KCL 2.x 消费者示例:https ://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html
最重要的部分是:
SampleRecordProcessor
- 消费者处理逻辑所在的 ShardRecordProcessor 接口的实现。SampleRecordProcessorFactory
Scheduler
配置:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
上面的重要部分是指定了默认的检索配置(),它在后台配置了增强型扇出消费者。显式方式如下:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(
new FanOutConfig(kinesisClient)
.streamName(streamName)
.applicationName(appName)
)
.maxListShardsRetryAttempts(maxListShardsRetryAttempts)
.initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream)
)
);
推荐阅读
- ruby - ruby 将数组的元素填充到一组嵌套数组中并删除重复项
- java - java.lang.IllegalStateException:页面只能偏移正数
- github - 在 vs 代码中更改分支主控
- docker-compose - Hyperledger Fabric 通道创建错误 - 未知的联盟名称
- git - 为什么 origin/master 在提交中落后?
- android - 如何在可扩展的 Listview 中使用改造来解析嵌套的 json?
- c# - QuickIONET:不存在文件的 FileInfo 对象
- javascript - 如何计算页面上加载的所有 JS 文件的哈希(验证 html5 游戏运行时)
- api-platform.com - “父”属性没有出现在带有 denormalizationContext 组的“示例值”中
- devise - 使用 Omniauth 登录 Devise 以及访问 API