apache-kafka-streams - 默认retention.ms 未能为selectKey() 创建内部更改日志主题
问题描述
我们使用 selectKey() 来更改密钥。在我们迁移到 IBM Cloud 上的新标准计划事件流之前,它运行良好。然后我们在下面捕获了异常。它说我们的主题retentions.ms 不适合范围[3600000..2592000000]。所以我想知道我们如何解决这个问题。
谢谢,
[WARNING]
org.apache.kafka.streams.errors.StreamsException: Could not create topic employeeFilter-KSTREAM-KEY-SELECT-0000000047-repartition.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:138)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:892)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:472)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:419)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:592)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:94)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:894)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:874)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:586)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:400)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:413)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:861)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:810)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Invalid retention.ms specified. The allowed range is [3600000..2592000000]
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:122)
... 28 more
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Invalid retention.ms specified. The allowed range is [3600000..2592000000]
11:40:52.647 [main] INFO com.ibm.hr.mobility.processor.EmployeeProcessorApplication - Started EmployeeProcessorApplication in 944.09 seconds (JVM running for 954.418)
解决方案
允许的范围是 [3600000..2592000000]
您的代理仅允许创建保留期为 1 小时到 30 天的主题,并且您似乎正在尝试创建保留期超出该范围的主题。
推荐阅读
- python - 在Python3中打印涉及英文和日文文本的右对齐表格列
- sql - 使用纯 SQL 从 foo.year、foo.month、foo.day 转换为日期?
- sql - Teradata SQL如何将“按日期”转移到“日期范围”?
- python - 将 mysql 查询转换为 sqlalchemy 语句
- algorithm - 按差异对列表元素进行分组
- java - 如何在 Smali 汇编语言中传递即时值?
- rxjs - 在数据流之间切换并使用 RxJs 让它们暂停/播放
- android - 如何在 Xamarin (Android) 中添加 fileprovider xml 文件
- amazon-web-services - GCP 堆栈中的 AWS SNS 等效项
- java - 示例项目之间为 Active Directory 返回的 AccessToken 不同