apache-kafka - 由于组重新平衡,Spring Cloud Stream Kafka 提交失败
问题描述
我有CommitFailedException
一些耗时的 Spring Cloud Stream 应用程序。我知道要解决此问题,我需要设置max.poll.records
并max.poll.interval.ms
满足我对处理批次所需时间的期望。但是,我不太确定如何在 Spring Cloud Stream 中为消费者设置它。
例外:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:808) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:691) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1554) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1418) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:739) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.lang.Thread.run(Thread.java:748)
而且,我怎样才能确保这种情况根本不会发生?或者,在这种异常的情况下,我如何注入某种回滚?原因是我正在做一些其他的外部工作,一旦完成,我就会相应地发布输出消息。因此,如果在外部系统上完成工作后由于任何问题无法发布消息,我必须将其还原(通过 Kafka 发布和其他外部系统进行某种原子事务)。
解决方案
spring.cloud.stream.kafka.binder.consumerProperties
任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性,这里也允许未知的消费者属性。此处的属性取代了在引导和上面的配置属性中设置的任何属性。
默认值:空地图。
例如spring.cloud.stream.kafka.binder.consumerProperties.max.poll.records=10
或者在这里的绑定级别文档。
spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration
使用包含通用 Kafka 消费者属性的键/值对映射。除了拥有 Kafka 消费者属性外,这里还可以传递其他配置属性。例如应用程序需要的一些属性,例如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。
默认值:空地图。
例如spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=10
OffsetCommitCallback
您可以通过向侦听器容器添加ContainerProperties
并设置syncCommits
为来获得提交失败的通知false
。要自定义容器及其属性,请将ListenerContainerCustomizer
bean 添加到应用程序。
编辑
异步提交回调...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {
public static void main(String[] args) {
SpringApplication.run(So57970152Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckMode(AckMode.RECORD);
container.getContainerProperties().setSyncCommits(false);
container.getContainerProperties().setCommitCallback((map, ex) -> {
if (ex == null) {
System.out.println("Successful commit for " + map);
}
else {
System.out.println("Commit failed for " + map + ": " + ex.getMessage());
}
});
container.getContainerProperties().setClientId("so57970152");
};
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input", "foo".getBytes());
};
}
}
手动提交(同步)...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {
public static void main(String[] args) {
SpringApplication.run(So57970152Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
container.getContainerProperties().setClientId("so57970152");
};
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
System.out.println(in);
try {
ack.acknowledge(); // MUST USE MANUAL_IMMEDIATE for this to work.
System.out.println("Commit successful");
}
catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input", "foo".getBytes());
};
}
}
推荐阅读
- android - 如何在数据类参数没有默认值的情况下使用 dagger 在 android 中注入数据类?
- java - 继承类之间的静态方法 - Java
- pandas - 如何根据条件应用多个过滤条件以将其他列中的值复制到熊猫数据框中的新列中
- ios - 弹出视图控制器在 Xcode 11.2.1 中打开后立即关闭
- flutter - 几秒钟后,闪屏无法导航到重定向页面
- php - ClientException:客户端错误使用 laravel 在 GuzzleHttp 中给出“401 Unauthorized”错误
- c# - 如何使用依赖注入在 ASP.NET Core 中调用参数化构造函数?
- mongodb - 如何处理mongodb中的多对多关系?
- python - sqlalchemy.exc.DataError:(psycopg2.errors.StringDataRightTruncation)值对于类型字符变化而言太长(256)
- javascript - 我们如何从 UTF-16 中用于表示非 BMP 字符的两个 16 位代码点到 Unicode 中字符的单个代码点?