首页 > 解决方案 > Spring Cloud Stream 中的生产者回调与反应堆核心发布者

问题描述

我编写了一个 Spring Cloud Stream 应用程序,其中生产者将消息发布到指定的 kafka 主题。我的问题是如何添加生产者回调来接收消息已成功发布在主题上的确认/确认?就像我们在 spring kafka 中所做的一样producer.send(record, new callback { ... })(维护异步生产者)。下面是我的代码:

private final Sinks.Many<Message<?>> responseProcessor = Sinks.many().multicast().onBackpressureBuffer();

@Bean
public Supplier<Flux<Message<?>>> event() {
        return responseProcessor::asFlux;
    }    

public Message<?> publishEvent(String status) {
  try {
      String key = ...;
      response = MessageBuilder.withPayload(payload)
                  .setHeader(KafkaHeaders.MESSAGE_KEY, key)
                  .build();
      responseProcessor.tryEmitNext(response);
  } 

如何确保tryEmitNext已成功写入主题?

实施ProducerListener是一种解决方案吗?在 Spring Cloud Stream 中找不到具体的解决方案/文档

更新

我现在已经在下面实现了,似乎按预期工作

@Component
public class MyProducerListener<K, V> implements ProducerListener<K, V> {

    @Override
    public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
        // Do nothing on onSuccess
    }

    @Override
    public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        log.error("Producer exception occurred while publishing message : {}, exception : {}", producerRecord, exception);
    }

}

 @Bean
    ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> customizer(MyProducerListener pl) {
        return (handler, destinationName) -> handler.getKafkaTemplate().setProducerListener(pl);
    }

标签: spring-bootapache-kafkaspring-kafkaspring-cloud-streamkafka-producer-api

解决方案


请参阅Kafka 生产者属性

记录元数据频道

MessageChannel成功发送结果应该发送到的 bean 名称;bean 必须存在于应用程序上下文中。发送到通道的消息是带有附加标头的已发送消息(转换后,如果有)KafkaHeaders.RECORD_METADATA。头部包含RecordMetadataKafka客户端提供的一个对象;它包括在主题中写入记录的分区和偏移量。

ResultMetadata 元 = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失败的发送进入生产者错误通道(如果已配置);请参阅错误通道。默认值:空

您可以@ServiceActivator从这个通道异步添加一个来消费。


推荐阅读