spring-boot - Spring Cloud Stream 中的生产者回调与反应堆核心发布者
问题描述
我编写了一个 Spring Cloud Stream 应用程序,其中生产者将消息发布到指定的 kafka 主题。我的问题是如何添加生产者回调来接收消息已成功发布在主题上的确认/确认?就像我们在 spring kafka 中所做的一样producer.send(record, new callback { ... })
(维护异步生产者)。下面是我的代码:
private final Sinks.Many<Message<?>> responseProcessor = Sinks.many().multicast().onBackpressureBuffer();
@Bean
public Supplier<Flux<Message<?>>> event() {
return responseProcessor::asFlux;
}
public Message<?> publishEvent(String status) {
try {
String key = ...;
response = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.build();
responseProcessor.tryEmitNext(response);
}
如何确保tryEmitNext已成功写入主题?
实施ProducerListener是一种解决方案吗?在 Spring Cloud Stream 中找不到具体的解决方案/文档
更新
我现在已经在下面实现了,似乎按预期工作
@Component
public class MyProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
// Do nothing on onSuccess
}
@Override
public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
log.error("Producer exception occurred while publishing message : {}, exception : {}", producerRecord, exception);
}
}
@Bean
ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> customizer(MyProducerListener pl) {
return (handler, destinationName) -> handler.getKafkaTemplate().setProducerListener(pl);
}
解决方案
请参阅Kafka 生产者属性。
记录元数据频道
MessageChannel
成功发送结果应该发送到的 bean 名称;bean 必须存在于应用程序上下文中。发送到通道的消息是带有附加标头的已发送消息(转换后,如果有)KafkaHeaders.RECORD_METADATA
。头部包含RecordMetadata
Kafka客户端提供的一个对象;它包括在主题中写入记录的分区和偏移量。
ResultMetadata 元 = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败的发送进入生产者错误通道(如果已配置);请参阅错误通道。默认值:空
您可以@ServiceActivator
从这个通道异步添加一个来消费。
推荐阅读
- javascript - 添加到购物车
- javascript - 当单击从父组件传递时,为什么子组件未定义“this”?
- regex - Environment Modules: is-loaded 可以查找一系列模块文件名吗?
- python - 获取特定 NCT ID 历史记录的临床试验
- c# - (Unity) AdMob 问题 RealAds 未显示
- java - 如何使用 Spring Cloud Stream 指定向 RabbitMQ 发送消息的超时时间?
- roku - ToLocalTime() 在 Roku 中使用
- typescript - Typescript - 如何在命名空间中扩展一个类?
- javascript - 根据内部数组条件划分数组
- c# - 使用 CLI 将 C# byte[] 转换为 C++ char*