amazon-web-services - AWS Kinesis KCL 跳过启动前添加的记录
问题描述
我开始使用两者KPL
并KCL
在服务之间交换数据。但是每当consumer service
离线时,发送的所有数据KPL
都将永远丢失。所以我只得到那些在consumer service
启动并且shardConsumer
准备就绪时发送的数据块。我需要从最后一个消费点开始,或者以其他方式处理留下的数据。
这是我的ShardProcessor
代码:
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.records()
.forEach(record -> {
//my logic
});
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shard Ended", e);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shutdown Requested", e);
}
}
和配置代码:
public void configure(String streamName, ShardRecordProcessorFactory factory) {
Region region = Region.of(awsRegion);
KinesisAsyncClient kinesisAsyncClient =
KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder =
new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
UUID.randomUUID().toString(), factory);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}
解决方案
有两种方法可以解决这个问题。首先,问题。
默认情况下,KCL 配置为在 开始读取流LATEST
。此设置告诉流阅读器在“当前”时间戳处获取流。
在您的情况下,您在“现在”之前放置在该流中的数据。为了读取该数据,您可能需要考虑读取流中最早的数据。如果您设置默认流,则该流将存储数据 24 小时。
要从该流的“开始”或启动 KCL 应用程序前 24 小时读取数据,您需要将流读取器设置为TRIM_HORIZON
. 此设置称为initialPositionInStream
. 你可以在这里阅读。API中记录了三种不同的设置。
为了解决您的问题,如第一个链接中所述,首选方法是向属性文件添加一个条目。如果你不使用属性文件,你可以简单地将它添加到你的Scheduler
ctor中:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
使用此设置要记住的一件事是,当您在流中有数据并且从TRIM_HORIZON
. 在这种情况下,RecordProcessor
将尽可能快地遍历记录。这可能会在 Kinesis API 甚至下游系统(无论您在 RecordProcessor 拥有数据后将数据发送到何处)产生性能问题,
推荐阅读
- kubernetes - 没有 Nodeport 的 Kubernetes 入口控制器
- javascript - 如何通过在颤动中知道其值来删除密钥?
- sql - 仅存在 1 个特定字符串的 SQL
- json - Json转换TimeSpan错误.Net Core 2.2
- python - 如何将 Numba "@vectorize" ufunc 与结构化 Numpy 数组一起使用?
- c++ - 如何从多图中删除特定的重复项?
- tfs - 有序测试的测试结果在 Azure Devops 管道中分组
- list - 如何仅绘制 [x,y] 值列表中每个 x 值的最高 y 值
- python-3.x - boto3在python中有条件地删除
- tsql - 如何将浮点变量定义的天数添加到 SQL 中的日期