首页 > 解决方案 > 由于 max.poll.interval.ms 超出,Kafka 在 LeaveGroup 后提交事务

问题描述

我在kafka中使用事务。我为我的消费者容器提供了一个ChainedKafkaTransactionManagerJpaTransactionManager和组成的容器KafkaTransactionManager

我正在尝试了解当消费者卡住并因此发送LeaveGroup和禁用心跳线程时事务如何受到影响。

我已经设置max.poll.interval.ms为 10 秒。

我没有改变session.timeout.ms。默认为 10 秒。

我有两个应用程序,每个应用程序都有一个消费者。两个消费者都是事务性的。消费者 A 被操纵处理 30 秒,消费者 B 在 1 秒内处理它。两个消费者都从同一个主题中读取,作为 3 个分区。

  1. 向 Kafka 发送记录
  2. 消费者 A 收到记录。
  3. 消费者 A 开始处理记录。
  4. 消费者A处理时间超过max.poll.interval.ms
  5. 消费者 A 发送 LeaveGroup 并且心跳停止。
  6. 卡夫卡重新平衡。所有分区现在都分配给消费者 B。
  7. 消费者 B 收到相同的记录并处理它。
  8. 消费者 B 提交交易。
  9. 消费者 A 现已完成处理(30 秒)。
  10. 消费者 A 提交交易。
  11. 卡夫卡重新平衡。分区被重新分配给两个消费者。

事务被处理并提交两次。两个消费者都应该是幂等的,以确保处理相同的记录不会产生任何后果。

我的假设是,由于 LeaveGroup 和停止心跳,消费者 A 会抛出异常。然而事实并非如此。我已经用两个应用程序中的唯一事务 ID 和相同的事务 ID 对此进行了测试 - 结果相同。

为什么消费者 A 在发送 LeaveGroup 后提交事务?

我不完全确定这是否是一个错误。但是,我已将这种奇怪的行为或错误提交给Apache Kafka

消费者 A 的日志

2018-07-11 11:59:22.365 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=transactionId420] Transition from state READY to IN_TRANSACTION
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [brave.kafka.clients.TracingProducer@631071b0]
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Opened new EntityManager [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=ExecutableList{size=0} updates=ExecutableList{size=0} deletions=ExecutableList{size=0} orphanRemovals=ExecutableList{size=0} collectionCreations=ExecutableList{size=0} collectionRemovals=ExecutableList{size=0} collectionUpdates=ExecutableList{size=0} collectionQueuedOps=ExecutableList{size=0} unresolvedInsertDependencies=null])] for JPA transaction
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.h.e.t.internal.TransactionImpl         : begin
2018-07-11 11:59:22.367 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Exposing JPA transaction as JDBC transaction [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@4eef56f0]
2018-07-11 11:59:22.427 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[data], headers={kafka_offset=[34], kafka_consumer=brave.kafka.clients.TracingConsumer@1484643f, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[1], kafka_receivedTopic=[trans-topic], kafka_receivedTimestamp=[1531299912221], kafka_batchConvertedHeaders=[{X-B3-SpanId=[B@1e664339, X-B3-ParentSpanId=[B@73f2c38a, X-B3-Sampled=[B@5f0ca155, X-B3-TraceId=[B@68ac877c}]}]]

...
2018-07-11 11:59:30.503 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=mygrp42] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2018-07-11 11:59:30.608 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=mygrp42] Received successful Heartbeat response
2018-07-11 11:59:32.256 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=mygrp42] Sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null)
2018-07-11 11:59:32.256 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=mygrp42] Disabling heartbeat thread
2018-07-11 11:59:37.458 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {trans-topic-0=OffsetAndMetadata{offset=408, metadata=''}}
2018-07-11 11:59:37.465 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=transactionId420] Begin adding offsets {trans-topic-0=OffsetAndMetadata{offset=408, metadata=''}} for consumer group mygrp42 to transaction
2018-07-11 11:59:37.465 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=transactionId420] Enqueuing transactional request (type=AddOffsetsToTxnRequest, transactionalId=transactionId420, producerId=0, producerEpoch=64, consumerGroupId=mygrp42)
2018-07-11 11:59:37.465 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1, transactionalId=transactionId420] Sending transactional request (type=AddOffsetsToTxnRequest, transactionalId=transactionId420, producerId=0, producerEpoch=64, consumerGroupId=mygrp42) to node localhost:9092 (id: 0 rack: null)
2018-07-11 11:59:37.467 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=transactionId420] Successfully added partition for consumer group mygrp42 to transaction
2018-07-11 11:59:37.467 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1, transactionalId=transactionId420] Sending transactional request (type=TxnOffsetCommitRequest, transactionalId=transactionId420, producerId=0, producerEpoch=64, consumerGroupId=mygrp42, offsets={trans-topic-0=CommittedOffset(offset=408, metadata='')}) to node localhost:9092 (id: 0 rack: null)
2018-07-11 11:59:37.469 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=transactionId420] Successfully added offsets {trans-topic-0=CommittedOffset(offset=408, metadata='')} from consumer group mygrp42 to transaction.

消费者 B 的日志

2018-07-11 11:59:33.777 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-07-11 11:59:33.777 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-11 11:59:33.777 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=transactionId421] Transition from state READY to IN_TRANSACTION
2018-07-11 11:59:33.777 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [brave.kafka.clients.TracingProducer@30b6dff4]
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Opened new EntityManager [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=ExecutableList{size=0} updates=ExecutableList{size=0} deletions=ExecutableList{size=0} orphanRemovals=ExecutableList{size=0} collectionCreations=ExecutableList{size=0} collectionRemovals=ExecutableList{size=0} collectionUpdates=ExecutableList{size=0} collectionQueuedOps=ExecutableList{size=0} unresolvedInsertDependencies=null])] for JPA transaction
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.h.e.t.internal.TransactionImpl         : begin
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Exposing JPA transaction as JDBC transaction [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@2b0eff5d]
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[data], headers={kafka_offset=[407], kafka_consumer=brave.kafka.clients.TracingConsumer@103b53ce, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_receivedTopic=[trans-topic], kafka_receivedTimestamp=[1531299562360], kafka_batchConvertedHeaders=[{X-B3-SpanId=[B@38658f41, X-B3-ParentSpanId=[B@2faeb13a, X-B3-Sampled=[B@29e18244, X-B3-TraceId=[B@75496432}]}]]
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {trans-topic-0=OffsetAndMetadata{offset=408, metadata=''}}
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=transactionId421] Begin adding offsets {trans-topic-0=OffsetAndMetadata{offset=408, metadata=''}} for consumer group mygrp42 to transaction
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=transactionId421] Enqueuing transactional request (type=AddOffsetsToTxnRequest, transactionalId=transactionId421, producerId=1, producerEpoch=11, consumerGroupId=mygrp42)
2018-07-11 11:59:33.779 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-2, transactionalId=transactionId421] Sending transactional request (type=AddOffsetsToTxnRequest, transactionalId=transactionId421, producerId=1, producerEpoch=11, consumerGroupId=mygrp42) to node localhost:9092 (id: 0 rack: null)
2018-07-11 11:59:33.780 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=transactionId421] Successfully added partition for consumer group mygrp42 to transaction
2018-07-11 11:59:33.780 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-2, transactionalId=transactionId421] Sending transactional request (type=TxnOffsetCommitRequest, transactionalId=transactionId421, producerId=1, producerEpoch=11, consumerGroupId=mygrp42, offsets={trans-topic-0=CommittedOffset(offset=408, metadata='')}) to node localhost:9092 (id: 0 rack: null)
2018-07-11 11:59:33.781 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=transactionId421] Successfully added offsets {trans-topic-0=CommittedOffset(offset=408, metadata='')} from consumer group mygrp42 to transaction.

标签: javaapache-kafkaspring-kafka

解决方案


推荐阅读