首页 > 解决方案 > 默认retention.ms 未能为selectKey() 创建内部更改日志主题

问题描述

我们使用 selectKey() 来更改密钥。在我们迁移到 IBM Cloud 上的新标准计划事件流之前,它运行良好。然后我们在下面捕获了异常。它说我们的主题retentions.ms 不适合范围[3600000..2592000000]。所以我想知道我们如何解决这个问题。

谢谢,

[WARNING]
org.apache.kafka.streams.errors.StreamsException: Could not create topic employeeFilter-KSTREAM-KEY-SELECT-0000000047-repartition.
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:138)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:892)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:472)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:419)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:592)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:94)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:894)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:874)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:586)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:400)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:303)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:413)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:861)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:810)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Invalid retention.ms specified. The allowed range is [3600000..2592000000]
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:122)
    ... 28 more
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Invalid retention.ms specified. The allowed range is [3600000..2592000000]
11:40:52.647 [main] INFO  com.ibm.hr.mobility.processor.EmployeeProcessorApplication - Started EmployeeProcessorApplication in 944.09 seconds (JVM running for 954.418)

标签: apache-kafka-streamsspring-cloud-stream

解决方案


允许的范围是 [3600000..2592000000]

您的代理仅允许创建保留期为 1 小时到 30 天的主题,并且您似乎正在尝试创建保留期超出该范围的主题。


推荐阅读