kotlin - Transactional kafkalistener 中的 kafkaTemplate 在事务回滚时发布消息
问题描述
我正在使用 spring-kafka 测试一次交付。我的理解是,通过 kafkaTemplate 发布的偏移量和任何消息都将成为同一事务的一部分。
但是,然后我在发布后抛出异常,即使未提交消费者偏移量,消息仍会发布。
我的设置有问题吗?我还需要在每个 kafkalistener 类上设置 @transaction 吗?当我删除它时,我看到在 spring-kafka 调试日志中创建了事务。
@Transactional("kafkaTransactionManager")
@KafkaListener(
id = "\${messaging.command.consumer-group-id}",
clientIdPrefix = "\${messaging.command.consumer-group-id}",
topics = ["\${messaging.command.topic}"],
concurrency = "\${messaging.command.listener-count}"
)
fun processCommand1(@Payload command: EntityCommand<JsonNode>, record: ConsumerRecord<String, Array<Byte>>) {
testEventPublisher.publish(record.key(), "test")
testEventPublisher.publish(randomUUID().toString(), "test)
throw RuntimeException("test")
}
发布者(我在尝试使其正常工作时添加了 executeInTransaction()):
class TransactionalTopicPublisher<TYPE>(val kafkaTemplate: KafkaTemplate<String, Any>, val topic: String) {
fun publish(key: String, message: TYPE) {
kafkaTemplate.executeInTransaction {
kafkaTemplate.send(
topic,
key,
message
)
}
}
}
生产者配置:
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<String, Any>): KafkaTemplate<String, Any> {
return KafkaTemplate(producerFactory)
}
@Bean(KAFKA_TRANSACTION_MANAGER)
fun kafkaTransactionManager(kafkaProducerFactory: ProducerFactory<String, Any>): KafkaTransactionManager<String, Any> {
val kafkaTransactionManager = KafkaTransactionManager<String, Any>(kafkaProducerFactory)
return kafkaTransactionManager
}
@Bean
fun kafkaProducerFactory(kafkaJsonSerializer: JsonSerializer<Any>): ProducerFactory<String, Any> {
val factory = DefaultKafkaProducerFactory<String, Any>(producerConfig())
factory.setTransactionIdPrefix(transactionIdPrefix)
factory.setValueSerializer(kafkaJsonSerializer)
return factory
}
@Bean
fun producerConfig(): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java.name,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1,
ProducerConfig.ACKS_CONFIG to "all",
ProducerConfig.BATCH_SIZE_CONFIG to 16384,
ProducerConfig.LINGER_MS_CONFIG to 1,
ProducerConfig.BUFFER_MEMORY_CONFIG to 33554432,
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to ProducerInterceptor::class.java.name
)
}
@Bean
fun kafkaJsonSerializer(kafkaObjectMapper: ObjectMapper): JsonSerializer<Any> {
val jsonSerializer = JsonSerializer<Any>(kafkaObjectMapper)
jsonSerializer.isAddTypeInfo = false
return jsonSerializer
}
消费者配置:
@Bean
fun kafkaListenerContainerFactory(
kafkaTransactionManager: KafkaTransactionManager<String, Any>,
stringJsonMessageConverter: StringJsonMessageConverter
): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
factory.setMessageConverter(stringJsonMessageConverter)
factory.setErrorHandler(messagingErrorHandler())
factory.containerProperties.transactionManager = kafkaTransactionManager
return factory
}
@Bean
fun stringJsonMessageConverter(kafkaObjectMapper: ObjectMapper) =
StringJsonMessageConverter(kafkaObjectMapper)
@Bean
fun messagingErrorHandler() =
MessagingErrorHandler()
@Bean
fun consumerFactory(): ConsumerFactory<String, Any> {
val consumerFactory = DefaultKafkaConsumerFactory<String, Any>(consumerConfig())
return consumerFactory
}
@Bean
fun consumerConfig(): Map<String, Any> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name,
ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT),
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORD,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to ConsumerInterceptor::class.java.name,
JsonDeserializer.USE_TYPE_INFO_HEADERS to false
)
}
OffsetCommittingAndDeadLetterPublishingRecoverer:
@Transactional(KAFKA_TRANSACTION_MANAGER)
class OffsetCommittingAndDeadLetterPublishingRecoverer(val template: KafkaTemplate<Any, Any>) :
DeadLetterPublishingRecoverer(template) {
override fun accept(record: ConsumerRecord<*, *>, exception: Exception) {
super.accept(record, exception)
val topicPartition = TopicPartition(record.topic(), record.partition())
val offsetAndMetadata = OffsetAndMetadata(record.offset() +1)
template.executeInTransaction {
template.sendOffsetsToTransaction(
mapOf(topicPartition to offsetAndMetadata)
)
}
}
}
解决方案
推荐阅读
- python - 语法验证实用程序
- r - 在访问选项卡之前,另一个选项卡中的传单未使用传单代理更新
- python - 如何从熊猫数据框创建多关系边缘列表?
- c++ - 我如何制作一个函数来检查一个单词在向量中是否重复超过两次或更多次,并输出它重复的次数?在 C++ 中
- python - 使用 sklearn 的决策树分类器 100% 准确率
- javascript - 反应原生钩子中的这种拉动刷新功能的等价物是什么?
- azure - 应用程序“azureeventgrid”的连接器与 Gmail 不兼容
- automated-tests - 在 docker 内使用 chromedriver 在 chrome 上运行空手道测试
- zap - 记录的 api 调用的 ZAP 身份验证
- python - 如何从 numpy 数组中切出一个正方形并从中创建一个数组?