apache-kafka - 事务性 Kafka 生产者
问题描述
我正在尝试使我的 kafka 生产者具有事务性。我正在发送 10 条消息。如果发生任何错误,则不应向 kafka 发送任何消息,即没有消息或全部消息。
我正在使用 Spring Boot KafkaTemplate。
@Configuration
@EnableKafka
public class KakfaConfiguration {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
// props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
// appProps.getJksLocation());
// props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
// appProps.getJksPassword());
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, acks);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
config.put(ProducerConfig.RETRIES_CONFIG, retries);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean(name = "ktm")
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
}
如文档中所述,我正在发送如下 10 条消息。应发送 9 条消息,我的消息大小超过 1MB,被 Kafka 代理拒绝,原因是RecordTooLargeException
https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager
@Component
@EnableTransactionManagement
class Sender {
@Autowired
private KafkaTemplate<String, String> template;
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Transactional("ktm")
public void sendThem(List<String> toSend) throws InterruptedException {
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(toSend.size());
ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOG.info(" message sucess : " + result.getProducerRecord().value());
latch.countDown();
}
@Override
public void onFailure(Throwable ex) {
LOG.error("Message Failed ");
latch.countDown();
}
};
toSend.forEach(str -> {
ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
future.addCallback(callback);
});
if (latch.await(12, TimeUnit.MINUTES)) {
LOG.info("All sent ok");
} else {
for (int i = 0; i < toSend.size(); i++) {
if (!futures.get(i).isDone()) {
LOG.error("No send result for " + toSend.get(i));
}
}
}
但是当我看到主题 t_hello_world 9 消息时。我的期望是看到 0 条消息,因为我的生产者是事务性的。我怎样才能实现它?
我收到以下日志
2020-04-30 18:04:36.036 ERROR 18688 --- [ scheduling-1] o.s.k.core.DefaultKafkaProducerFactory : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:923) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:297) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1013) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:296) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:713) ~[kafka-clients-2.4.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
2020-04-30 18:04:36.037 WARN 18688 --- [ scheduling-1] o.s.k.core.DefaultKafkaProducerFactory : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
2020-04-30 18:04:36.038 INFO 18688 --- [ scheduling-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-prod-990, transactionalId=prod-990] Closing the Kafka producer with timeoutMillis = 5000 **ms.
2020-04-30 18:04:36.038 INFO 18688 --- [oducer-prod-990] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-990, transactionalId=prod-990] Aborting incomplete transaction due to shutdown**
解决方案
未提交的记录写入日志;当事务提交或回滚时,会在日志中写入一条带有事务状态的额外记录。
默认情况下,消费者可以查看所有记录,包括未提交的记录(但不包括特殊的提交/中止记录)。
对于控制台使用者,您需要将隔离级别设置为read_committed
. 查看帮助:
--isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommitted to read all
messages. (default: read_uncommitted)
推荐阅读
- python - 如何使用可用的字符串和日期时间对象检索 strptime 模型?
- c# - 如何在 DataGridView 的多个已编辑单元格中获取值以更新数据库
- python - 使用 Seaborn 在单个图中绘制不同年份的时间序列以进行比较和限制棒
- c# - 按月计算的简单 LINQ 查询
- json - 以任意顺序对键进行排序并保留未知键
- c# - 集成测试 .NET Core 3 Worker
- powerbi - Power BI - 如何在 Power Query 中仅提取周日日期
- r - 删除特定列中具有特定值的重复行
- swiftui - 在 NavigationView 的 SwiftUI 列表中设置雪佛龙的颜色
- android - APK 在某些手机上以 ZIP 格式下载