首页 > 解决方案 > Kafka中的事务超时

问题描述

我试图了解 Kafka 中的事务超时机制。我在处理函数中配置transaction.timeout.ms= 30000 (30s) 并休眠 60 秒(假设在一个长过程中)

代码片段

@Bean
public Function<JsonNode, JsonNode> process() {
    return e -> {
        logger.info("Received event={}", e.toString());

        if (sleepTime > 0) {
            sleepInSecond(sleepTime);
            // sleep in 60 seconds
        }

        logger.info("Sent event={}", e.toString());
        return e;
    };
}

我的处理器正在使用 Spring Cloud Stream Kafka

应用程序.yaml

spring:
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: extraction-requests
          group: extraction
          consumer:
            maxAttempts: 1
        process-out-0:
          destination: conversion-requests
      kafka:
        bindings:
           process-in-0:
             consumer:
               configuration:
                 max.poll.records: 1
                 max.poll.interval.ms: 30000 // 30 seconds
                 isolation.level: read_committed
        binder:
          transaction:
            transaction-id-prefix: extract-${random.uuid}-
            producer:
              configuration:
                max.in.flight.requests.per.connection: 2
                retries: 1
                acks: all
                enable.idempotence: true
                transaction.timeout.ms: 10000

我预计生产者将被抛出异常并在 30 秒后终止事务。但就我而言,它在睡眠时间(60秒)后引发异常

应用程序日志

2021-07-05 17:48:45.146  INFO 18600 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$91d9df10 : Received event={"name":"GeneratedName_1625481832579","type":"GeneratedType_1625481832579"}
...
2021-07-05 17:49:45.146  INFO 18600 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$91d9df10 : Sent event={"name":"GeneratedName_1625481832579","type":"GeneratedType_1625481832579","extractedTime":1625482185146}
2021-07-05 17:49:45.283 ERROR 18600 --- [tion-requests.0] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{123, 34, 110, 97, 109, 101, 34, 58, 34, 71, 101, 110, 101, 114, 97, 116, 101, 100, 78, 97, 109, 101...' to topic conversion-requests:

    org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1710) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) [kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) [kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) [kafka-clients-2.6.0.jar:na]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
    
    2021-07-05 17:49:45.288 ERROR 18600 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
    
    org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1710) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.6.0.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
...

为什么transaction.timeout.ms不起作用?

标签: apache-kafkaspring-cloud-stream-binder-kafka

解决方案


推荐阅读