首页 > 解决方案 > Spring Cloud 流:Kafka Sink 获取备用消息

问题描述

我正在尝试使用 kafka 绑定构建一个简单的云流应用程序。让我描述一下设置。1. 我有一个制作人制作主题topic_1
2. 有一个流绑定器,topic_1经过一些处理后绑定到topic_2.

@StreamListener(MyBinder.INPUT)
    @SendTo(MyBinder.OUTPUT_2)
    public String handleIncomingMsgs(String s) {
        logger.info(s);  // prints all the messages
        return s;
    }
  1. 当生产者产生消息时,它StreamListner handleIncomingMsgs会获取所有消息。
  2. 收到后,它应该将消息转发到其他频道。
@Service
@EnableBinding(MyBinder.class)
public class LogMsg {

    @StreamListener(MyBinder.OUTPUT_2)
    public void handle(String board) {
        logger.info("Received payload: " + board); //prints every alternate messages
    }
  1. 这是我的活页夹
public interface ViewsStreams {

    String INPUT = "input";
    String OUTPUT_1 = "output_1";
    String OP_USERS = "output_2";

    @Autowired
    @Input(INPUT)
    SubscribableChannel job_board_views();

    @Autowired
    @Output(OUTPUT_1)
    MessageChannel outboundJobBoards();

    @Autowired
    @Output(OUTPUT_2)
    MessageChannel outboundUsers();
}

我是这些技术的新手。无法弄清楚这里出了什么问题。有人可以帮忙吗?

标签: javaspring-kafkaspring-cloud-stream

解决方案


你的猜测是正确的;您在 OUTPUT_2 通道上有两个消费者 - 侦听器和发送消息的绑定。

他们每个人都会收到备用消息。


推荐阅读