apache-kafka - Kafka中的事务超时
问题描述
我试图了解 Kafka 中的事务超时机制。我在处理函数中配置transaction.timeout.ms
= 30000 (30s) 并休眠 60 秒(假设在一个长过程中)。
代码片段
@Bean
public Function<JsonNode, JsonNode> process() {
return e -> {
logger.info("Received event={}", e.toString());
if (sleepTime > 0) {
sleepInSecond(sleepTime);
// sleep in 60 seconds
}
logger.info("Sent event={}", e.toString());
return e;
};
}
我的处理器正在使用 Spring Cloud Stream Kafka
应用程序.yaml
spring:
cloud:
stream:
bindings:
process-in-0:
destination: extraction-requests
group: extraction
consumer:
maxAttempts: 1
process-out-0:
destination: conversion-requests
kafka:
bindings:
process-in-0:
consumer:
configuration:
max.poll.records: 1
max.poll.interval.ms: 30000 // 30 seconds
isolation.level: read_committed
binder:
transaction:
transaction-id-prefix: extract-${random.uuid}-
producer:
configuration:
max.in.flight.requests.per.connection: 2
retries: 1
acks: all
enable.idempotence: true
transaction.timeout.ms: 10000
我预计生产者将被抛出异常并在 30 秒后终止事务。但就我而言,它在睡眠时间(60秒)后引发异常
应用程序日志
2021-07-05 17:48:45.146 INFO 18600 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$91d9df10 : Received event={"name":"GeneratedName_1625481832579","type":"GeneratedType_1625481832579"}
...
2021-07-05 17:49:45.146 INFO 18600 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$91d9df10 : Sent event={"name":"GeneratedName_1625481832579","type":"GeneratedType_1625481832579","extractedTime":1625482185146}
2021-07-05 17:49:45.283 ERROR 18600 --- [tion-requests.0] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{123, 34, 110, 97, 109, 101, 34, 58, 34, 71, 101, 110, 101, 114, 97, 116, 101, 100, 78, 97, 109, 101...' to topic conversion-requests:
org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1710) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) [kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) [kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) [kafka-clients-2.6.0.jar:na]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
2021-07-05 17:49:45.288 ERROR 18600 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1710) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.6.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
...
为什么transaction.timeout.ms
不起作用?
解决方案
推荐阅读
- java - 创建具有名称的 bean 时出错,应用程序运行失败。弹簧靴
- python - Python 调试器 (pdb):使用 pdb 浏览多模块代码
- c# - 如何在组合框中绑定多个按钮
- python - 熊猫 Lambda Shift 值
- laravel - Laravel 事件没有响应正文返回到 Stripe
- ruby-on-rails - mongodb ruby delete_all 嵌套文档
- xcode - Xcode 模拟器:持久存储迁移失败,缺少映射模型
- c++ - MySQL C++ 连接器内存泄漏
- reactjs - 错误:(gcloud.app.deploy)错误响应:[9] Cloud build XXXXXXXXXXXXXX 状态:FAILURE
- angular - 如何使用 typescript 在控制台中打印 ngFor 循环值?